程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> 關於C語言 >> C++ 線程池實現

C++ 線程池實現

編輯:關於C語言

使用多線程編程可以顯著提高程序的運行速度,由於現在的操作系統都是多核的,所以一個多線程的程序,由於系統內核是基於時間片輪詢的,所以多線程程序再用系統內核的時間大大增多,所完成的任務就更快。

 

線程池頭文件:

 

//---------------------------------------------------------------------------
#ifndef CworkQueueH
#define CworkQueueH
//---------------------------------------------------------------------------
#include <queue>
#include<vcl.h>
class CWorkQueue;
/**
用法原理:通過派生類WorkItemBase的dowork方法來實現,線程處理任務
通過create任務創建線程,並且這些線程一直在for循環裡等待事件監聽
一旦任務棧裡有數據了觸發線程執行任務。
**/
/*------------------------------------------------------------------------
WorkItemBase
this is the basic WorkItem that the Work Queue Use its interface
This class should be inherited and these virtual abstract functions
implemented.
DoWork()
virtual abstract function is the function that is called when the
work item turn has came to be poped out of the queue and be processed.
Abort ()
This function is called, when the Destroy function is called, for each of the WorkItems
That are left in the queue.
------------------------------------------------------------------------*/
class WorkItemBase
{
virtual void   DoWork(void* pThreadContext)    = 0;
virtual void   Abort () = 0;
friend CWorkQueue;
};
typedef std::queue<WorkItemBase*>           WorkItemQueue,*PWorkItemQueue;
/*------------------------------------------------------------------------
CWorkQueue
This is the WorkOueue class also known as thread pool,
the basic idea of this class is creating thread that are waiting on a queue
of work item when the queue is inserted with items the threads wake up and
perform the requered work and go to sleep again.
------------------------------------------------------------------------*/
class  CWorkQueue
{
public:
virtual ~CWorkQueue(){};
bool Create(const unsigned int       nNumberOfThreads,
void*                    *pThreadDataArray             = NULL);
bool InsertWorkItem(WorkItemBase* pWorkItem);
void Destroy(int iWairSecond);
int GetThreadTotalNum();
private:
static unsigned long __stdcall ThreadFunc( void* pParam );
WorkItemBase* RemoveWorkItem();
int GetWorekQueueSize();
enum{
ABORT_EVENT_INDEX = 0,
SEMAPHORE_INDEX,
NUMBER_OF_SYNC_OBJ,
};
//申請到的線程
PHANDLE                  m_phThreads;
unsigned int             m_nNumberOfThreads;
void*                    m_pThreadDataArray;
HANDLE                   m_phSincObjectsArray[NUMBER_OF_SYNC_OBJ];
CRITICAL_SECTION         m_CriticalSection;
PWorkItemQueue           m_pWorkItemQueue;
};
#endif

CPP實現

//---------------------------------------------------------------------------
#pragma hdrstop
#include "CworkQueue.h"
//---------------------------------------------------------------------------
#include <assert.h>
typedef struct _THREAD_CONTEXT
{
CWorkQueue* pWorkQueue;
void*       pThreadData;
} THREAD_CONTEXT,*PTHREAD_CONTEXT;
/*------------------------------------------------------------------------
建立多線程   nNumberOfThreads多線程數目  ThreadData線程函數執行的參數
------------------------------------------------------------------------*/
bool CWorkQueue::Create(const unsigned int  nNumberOfThreads,
void*         *ThreadData      /*=NULL*/)
{
//創建任務隊列,存放後續將要執行的任務
m_pWorkItemQueue = new WorkItemQueue();
if(NULL == m_pWorkItemQueue )
{
return false;
}
//m_phSincObjectsArray保存了線程池的信號量和事件
//m_phSincObjectsArray[ABORT_EVENT_INDEX]保存的是事件,當用戶設置退出事件時使用
//m_phSincObjectsArray[SEMAPHORE_INDEX]保存信號量,當用戶設置執行任務時使用
//創建信號量多線程同步使用)
/*在信號量上我們定義兩種操作: Wait等待) 和 Release釋放)。
當一個線程調用Wait操作時,它要麼得到資源然後將信號量減一,要麼一直等下去指放入阻塞隊列),
直到信號量大於等於一時。Release釋放)實際上是在信號量上執行加操作*/
m_phSincObjectsArray[SEMAPHORE_INDEX] = CreateSemaphore(NULL,0,LONG_MAX,NULL);
if(m_phSincObjectsArray[SEMAPHORE_INDEX] == NULL)
{
delete m_pWorkItemQueue;
m_pWorkItemQueue = NULL;
return false;
}
//創建事件為手動置位,一次只能進入一個,False為初始不是運行狀態多線程同步使用)
m_phSincObjectsArray[ABORT_EVENT_INDEX] = CreateEvent(NULL,TRUE,FALSE,NULL);
if(m_phSincObjectsArray[ABORT_EVENT_INDEX]  == NULL)
{
delete m_pWorkItemQueue;
m_pWorkItemQueue = NULL;
CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
return false;
}
//創建並初始化臨界區多線程互斥訪問使用)
InitializeCriticalSection(&m_CriticalSection);
//創建線程數組
m_phThreads = new HANDLE[nNumberOfThreads];
if(m_phThreads == NULL)
{
delete m_pWorkItemQueue;
m_pWorkItemQueue = NULL;
CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
CloseHandle(m_phSincObjectsArray[ABORT_EVENT_INDEX]);
DeleteCriticalSection(&m_CriticalSection);
return false;
}
unsigned int i;
m_nNumberOfThreads = nNumberOfThreads;
DWORD dwThreadId;
PTHREAD_CONTEXT pThreadsContext;
//創建所有的線程
for(i = 0 ; i < nNumberOfThreads ; i++ )
{
//初始化線程函數運行時傳入的參數
pThreadsContext = new THREAD_CONTEXT;
pThreadsContext->pWorkQueue  = this;
pThreadsContext->pThreadData = ThreadData == NULL? NULL : ThreadData[i];
//創建線程
m_phThreads[i] = CreateThread(NULL,
0,
CWorkQueue::ThreadFunc,
pThreadsContext,
0,
&dwThreadId);
if(m_phThreads[i] == NULL)
{
delete pThreadsContext;
m_nNumberOfThreads = i;
Destroy(5);
return false;
}
}
return true;
}
/*------------------------------------------------------------------------
向任務隊列添加任務
任務執行類通過繼承基類WorkItemBase之後使用多態函數DoWork來完成真實任務
------------------------------------------------------------------------*/
bool CWorkQueue::InsertWorkItem(WorkItemBase* pWorkItem)
{
assert(pWorkItem != NULL);
//多線程互斥訪問,進入臨界區
EnterCriticalSection(&m_CriticalSection);
//將任務插入隊列
m_pWorkItemQueue->push(pWorkItem);
//離開臨界區
LeaveCriticalSection(&m_CriticalSection);
//釋放信號量,使信號量加1,促使後面的Wailt操作執行
if (!ReleaseSemaphore(m_phSincObjectsArray[SEMAPHORE_INDEX],1,NULL))
{
assert(false);
return false;
}
return true;
}
/*------------------------------------------------------------------------
從工作隊列中移除對象,並返回移除的對象
------------------------------------------------------------------------*/
WorkItemBase*  CWorkQueue::RemoveWorkItem()
{
WorkItemBase* pWorkItem;
//多線程間訪問互斥,進入臨界區
EnterCriticalSection(&m_CriticalSection);
//移除對象
pWorkItem = m_pWorkItemQueue->front();
m_pWorkItemQueue->pop();
//離開臨界區,其他等待線程可以進入臨界區
LeaveCriticalSection(&m_CriticalSection);
assert(pWorkItem != NULL);
return pWorkItem;
}
/*------------------------------------------------------------------------
線程執行的函數,實際執行的是任務隊列中的DoWork()
------------------------------------------------------------------------*/
unsigned long __stdcall CWorkQueue::ThreadFunc( void*  pParam )
{
//創建線程時傳入的參數
PTHREAD_CONTEXT       pThreadContext =  (PTHREAD_CONTEXT)pParam;
WorkItemBase*         pWorkItem      = NULL;
CWorkQueue*           pWorkQueue     = pThreadContext->pWorkQueue;
void*                 pThreadData    = pThreadContext->pThreadData;
DWORD dwWaitResult;
for(;;)
{
//WaitForMultipleObjects等待pWorkQueue->m_phSincObjectsArray信號量數組兩件事
//一個是執行任務的釋放信號量,一個是異常的釋放信號量
//當WaitForMultipleObjects等到多個內核對象的時候,如果它的bWaitAll 參數設置為false。
//其返回值減去WAIT_OBJECT_0 就是參數lpHandles數組的序號。如果同時有多個內核對象被觸發,
//這個函數返回的只是其中序號最小的那個。如果為TRUE 則等待所有信號量有效在往下執行。
//FALSE 當有其中一個信號量有效時就向下執行)
//本文WaitForMultipleObjects等待執行任務的信號量和退出銷毀任務信息的事件
//當有新任務添加到任務隊列,設置執行任務信號量,觸發任務執行
//當設置退出事件時,銷毀任務信息,所有線程因為沒有設置事件復位信息,因此會全部銷毀
dwWaitResult = WaitForMultipleObjects(NUMBER_OF_SYNC_OBJ,pWorkQueue->m_phSincObjectsArray,FALSE,INFINITE);
//WaitForMultipleObjects返回數組pWorkQueue->m_phSincObjectsArray編號
switch(dwWaitResult - WAIT_OBJECT_0)
{
//返回異常編號
case ABORT_EVENT_INDEX:
delete pThreadContext;
return 0;
//返回執行任務編號
case SEMAPHORE_INDEX:
//從任務隊列取一個任務執行
pWorkItem = pWorkQueue->RemoveWorkItem();
if(pWorkItem == NULL)
{
assert(false);
break;
}
//執行真正的任務
pWorkItem->DoWork(pThreadData);
break;
default:
assert(false);
delete pThreadContext;
return 0;
}
}
//刪除線程參數
delete pThreadContext;
return 1;
}
/**
獲取線程總數
**/
int CWorkQueue::GetThreadTotalNum()
{
return m_nNumberOfThreads;
}
/**
獲取任務池的大小
**/
int CWorkQueue::GetWorekQueueSize()
{
//多線程間訪問互斥,進入臨界區
EnterCriticalSection(&m_CriticalSection);
int iWorkQueueSize = m_pWorkItemQueue->size();
//離開臨界區
LeaveCriticalSection(&m_CriticalSection);
return iWorkQueueSize;
}
/*------------------------------------------------------------------------
Destroy
銷毀線程池
------------------------------------------------------------------------*/
void CWorkQueue::Destroy(int iWairSecond)
{
//為防止子線程任務沒有執行完,主線程就銷毀線程池,就加入一個等待函數
while(0 != GetWorekQueueSize())
{
//主線程等待線程池完成所有任務
Sleep(iWairSecond*1000);
}
//設置退出事件,所有線程因為沒有設置事件復位信息,因此會全部銷毀
if(!SetEvent(m_phSincObjectsArray[ABORT_EVENT_INDEX]))
{
assert(false);
return;
}
//wait for all the threads to end
WaitForMultipleObjects(m_nNumberOfThreads,m_phThreads,true,INFINITE);
//clean queue
while(!m_pWorkItemQueue->empty())
{
m_pWorkItemQueue->front()->Abort();
m_pWorkItemQueue->pop();
}
delete m_pWorkItemQueue;
m_pWorkItemQueue = NULL;
CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
CloseHandle(m_phSincObjectsArray[ABORT_EVENT_INDEX]);
DeleteCriticalSection(&m_CriticalSection);
//close all threads handles
for(int i = 0 ; i < m_nNumberOfThreads ; i++)
CloseHandle(m_phThreads[i]);
delete[] m_phThreads;
}
#pragma package(smart_init)

 

 

使用方式:

//---------------------------------------------------------------------------
#ifndef CworkItemH
#define CworkItemH
#include "CworkQueue.h"
#include "OprateEXCEL.h"
//---------------------------------------------------------------------------
class CworkItem :public WorkItemBase
{
public:
void   DoWork(void* pThreadContext);
void   Abort ();
void  SetWriteContent(const vTExcelSheetDate &tvTExcelSheetDate);
void  SetExcelPath(const String &ExcelPath);
private:
//要寫入Excel的內容
vTExcelSheetDate m_vTExcelSheetDate;
//Excel文件的路徑
String m_ExcelPath;
};
#endif
//---------------------------------------------------------------------------
#pragma hdrstop
#include "CworkItem.h"
//---------------------------------------------------------------------------
/**************************************
****************************************
函數功能:    保存Excel寫入內容
****************************************
作者:時間:2013.6.30
****************************************
****************************************/
void  CworkItem::SetWriteContent(const vTExcelSheetDate &tvTExcelSheetDate)
{
m_vTExcelSheetDate.clear();
vTExcelSheetDate().swap(m_vTExcelSheetDate);
m_vTExcelSheetDate =  tvTExcelSheetDate;
}
/**************************************
****************************************
函數功能:    保存Excel保存路徑
****************************************
作者:時間:2013.6.30
****************************************
****************************************/
void  CworkItem::SetExcelPath(const String &ExcelPath)
{
m_ExcelPath =  ExcelPath;
}
/**************************************
****************************************
函數功能:    實現基類的工作方法
****************************************
作者:    時間:2013.6.30
****************************************
****************************************/
void  CworkItem::DoWork(void* pThreadContext)
{
OperateExcel taOperateExcel;
String sError;
taOperateExcel.WriteDateToExcel(m_ExcelPath,m_vTExcelSheetDate,sError);
//工作完成後,自我了斷
delete this;
}
void CworkItem::Abort ()
{
delete this;
}

 

 

 

CworkItem *pCworkItem = new CworkItem();
pCworkItem->SetWriteContent(tvTExcelSheetDate);
pCworkItem->SetExcelPath(sFilePath);
m_CWorkQueue.InsertWorkItem(pCworkItem);
pCworkItem = new CworkItem();
pCworkItem->SetWriteContent(tvTExcelSheetDate);
sFilePath = "F:\\Project\\多線程寫excel\\song1.xls";
pCworkItem->SetExcelPath(sFilePath);
m_CWorkQueue.InsertWorkItem(pCworkItem);
m_CWorkQueue.Destroy(5);

 

本文出自 “風清揚song” 博客,請務必保留此出處http://2309998.blog.51cto.com/2299998/1264085

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved