線程池的道理與完成詳解。本站提示廣大學習愛好者:(線程池的道理與完成詳解)文章只能為提供參考,不一定能成為您想要的結果。以下是線程池的道理與完成詳解正文
一. 線程池的簡介
平日我們應用多線程的方法是,須要時創立一個新的線程,在這個線程裡履行特定的義務,然後在義務完成撤退退卻出。這在普通的運用裡曾經可以或許知足我們運用的需求,究竟我們其實不是甚麼時刻都須要創立年夜量的線程,並在它們履行一個簡略的義務後燒毀。
然則在一些web、email、database等運用裡,好比彩鈴,我們的運用在任什麼時候候都要預備應對數量偉大的銜接要求,同時,這些要求所要完成的義務卻又能夠異常的簡略,即只占用很少的處置時光。這時候,我們的運用有能夠處於一直的創立線程並燒毀線程的狀況。雖然說比起過程的創立,線程的創立時光曾經年夜年夜延長,然則假如須要頻仍的創立線程,而且每一個線程所占用的處置時光又異常冗長,則線程創立和燒毀帶給處置器的額定累贅也是很可不雅的。
線程池的感化恰是在這類情形下有用的下降頻仍創立燒毀線程所帶來的額定開支。普通來講,線程池都是采取預創立的技巧,在運用啟動之初便事後創立必定數量的線程。運用在運轉的進程中,須要時可以從這些線程所構成的線程池裡請求分派一個余暇的線程,來履行必定的義務,義務完成後,其實不是將線程燒毀,而是將它返還給線程池,由線程池自行治理。假如線程池中事後分派的線程曾經全體分派終了,但此時又有新的義務要求,則線程池會靜態的創立新的線程去順應這個要求。固然,有能夠,某些時段運用其實不須要履行許多的義務,招致了線程池中的線程年夜多處於余暇的狀況,為了節儉體系資本,線程池就須要靜態的燒毀個中的一部門余暇線程。是以,線程池都須要一個治理者,依照必定的請求去靜態的保護個中線程的數量。
基於下面的技巧,線程池將頻仍創立和燒毀線程所帶來的開支分攤到了每一個詳細履行的義務上,履行的次數越多,則分攤到每一個義務上的開支就越小。
固然,假如線程創立燒毀所帶來的開支與線程履行義務的開支比擬眇乎小哉,可以疏忽不計,則線程池並沒有應用的需要。好比,FTP、Telnet等運用時。
二. 線程池的設計
上面應用C說話來完成一個簡略的線程池,為了使得這個線程池庫應用起來加倍便利,特在C完成中參加了一些OO的思惟,與Objective-C分歧,它僅僅是應用了struct來模仿了c++中的類,其實這類方法在linux內核中年夜量可見。
在這個庫裡,與用戶有關的接口重要有:
typedef struct tp_work_desc_s tp_work_desc; //運用線程履行義務時所須要的一些信息
typedef struct tp_work_s tp_work; //線程履行的義務
typedef struct tp_thread_info_s tp_thread_info; //描寫了各個線程id,能否余暇,履行的義務等信息
typedef struct tp_thread_pool_s tp_thread_pool; // 有關線程池操作的接口信息
//thread parm
struct tp_work_desc_s{
……
};
//base thread struct
struct tp_work_s{
//main process function. user interface
void (*process_job)(tp_work *this, tp_work_desc *job);
};
tp_thread_pool *creat_thread_pool(int min_num, int max_num);
tp_work_desc_s表現運用線程履行義務時所須要的一些信息,會被看成線程的參數傳遞給每一個線程,根據運用的分歧而分歧,須要用戶界說構造的內容。tp_work_s就是我們願望線程履行的義務了。當我們請求分派一個新的線程時,起首要明白的指定這兩個構造,即該線程完成甚麼義務,而且完成這個義務須要哪些額定的信息。接口函數creat_thread_pool用來創立一個線程池的實例,應用時須要指定該線程池實例所能包容的最小線程數min_num和最年夜線程數max_num。最小線程數即線程池創立時預創立的線程數量,這個數量的年夜小也直接影響了線程池所能起到的後果,假如指定的太小,線程池中預創立的線程很快就將分派終了並須要創立新的線程來順應赓續的要求,假如指定的太年夜,則將能夠會有年夜量的余暇線程。我們須要依據本身運用的現實須要停止指定。描寫線程池的構造以下:
//main thread pool struct
struct tp_thread_pool_s{
TPBOOL (*init)(tp_thread_pool *this);
void (*close)(tp_thread_pool *this);
void (*process_job)(tp_thread_pool *this, tp_work *worker, tp_work_desc *job);
int (*get_thread_by_id)(tp_thread_pool *this, int id);
TPBOOL (*add_thread)(tp_thread_pool *this);
TPBOOL (*delete_thread)(tp_thread_pool *this);
int (*get_tp_status)(tp_thread_pool *this);
int min_th_num; //min thread number in the pool
int cur_th_num; //current thread number in the pool
int max_th_num; //max thread number in the pool
pthread_mutex_t tp_lock;
pthread_t manage_thread_id; //manage thread id num
tp_thread_info *thread_info; //work thread relative thread info
};
構造tp_thread_info_s描寫了各個線程id、能否余暇、履行的義務等信息,用戶其實不須要關懷它。
//thread info
struct tp_thread_info_s{
pthread_t thread_id; //thread id num
TPBOOL is_busy; //thread status:true-busy;flase-idle
pthread_cond_t thread_cond;
pthread_mutex_t thread_lock;
tp_work *th_work;
tp_work_desc *th_job;
};
tp_thread_pool_s構造包括了有關線程池操作的接口和變量。在應用creat_thread_pool前往一個線程池實例以後,起首要應用明白應用init接口對它停止初始化。在這個初始化進程中,線程池會預創立指定的最小線程數量的線程,它們都處於壅塞狀況,其實不消耗CPU,然則會占用必定的內存空間。同時init也會創立一個線程池的治理線程,這個線程會在線程池的運轉周期內一向履行,它將准時的檢查剖析線程池的狀況,假如線程池中余暇的線程過量,它會刪除部門余暇的線程,固然它其實不會使一切線程的數量小於指定的最小線程數。
在曾經創立並初始化了線程池以後,我們便可以指定tp_work_desc_s和tp_work_s構造,並應用線程池的process_job接口來履行它們。這些就是我們應用這個線程池時所須要懂得的一切器械。假如不再須要線程池,可使用close接口燒毀它。
三. 完成代碼
Thread-pool.h(頭文件):
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <pthread.h>
#include <signal.h>
#ifndef TPBOOL
typedef int TPBOOL;
#endif
#ifndef TRUE
#define TRUE 1
#endif
#ifndef FALSE
#define FALSE 0
#endif
#define BUSY_THRESHOLD 0.5 //(busy thread)/(all thread threshold)
#define MANAGE_INTERVAL 5 //tp manage thread sleep interval
typedef struct tp_work_desc_s tp_work_desc;
typedef struct tp_work_s tp_work;
typedef struct tp_thread_info_s tp_thread_info;
typedef struct tp_thread_pool_s tp_thread_pool;
//thread parm
struct tp_work_desc_s{
char *inum; //call in
char *onum; //call out
int chnum; //channel num
};
//base thread struct
struct tp_work_s{
//main process function. user interface
void (*process_job)(tp_work *this, tp_work_desc *job);
};
//thread info
struct tp_thread_info_s{
pthread_t thread_id; //thread id num
TPBOOL is_busy; //thread status:true-busy;flase-idle
pthread_cond_t thread_cond;
pthread_mutex_t thread_lock;
tp_work *th_work;
tp_work_desc *th_job;
};
//main thread pool struct
struct tp_thread_pool_s{
TPBOOL (*init)(tp_thread_pool *this);
void (*close)(tp_thread_pool *this);
void (*process_job)(tp_thread_pool *this, tp_work *worker, tp_work_desc *job);
int (*get_thread_by_id)(tp_thread_pool *this, int id);
TPBOOL (*add_thread)(tp_thread_pool *this);
TPBOOL (*delete_thread)(tp_thread_pool *this);
int (*get_tp_status)(tp_thread_pool *this);
int min_th_num; //min thread number in the pool
int cur_th_num; //current thread number in the pool
int max_th_num; //max thread number in the pool
pthread_mutex_t tp_lock;
pthread_t manage_thread_id; //manage thread id num
tp_thread_info *thread_info; //work thread relative thread info
};
tp_thread_pool *creat_thread_pool(int min_num, int max_num);
Thread-pool.c(完成文件):
#include "thread-pool.h"
static void *tp_work_thread(void *pthread);
static void *tp_manage_thread(void *pthread);
static TPBOOL tp_init(tp_thread_pool *this);
static void tp_close(tp_thread_pool *this);
static void tp_process_job(tp_thread_pool *this, tp_work *worker, tp_work_desc *job);
static int tp_get_thread_by_id(tp_thread_pool *this, int id);
static TPBOOL tp_add_thread(tp_thread_pool *this);
static TPBOOL tp_delete_thread(tp_thread_pool *this);
static int tp_get_tp_status(tp_thread_pool *this);
/**
* user interface. creat thread pool.
* para:
* num: min thread number to be created in the pool
* return:
* thread pool struct instance be created successfully
*/
tp_thread_pool *creat_thread_pool(int min_num, int max_num){
tp_thread_pool *this;
this = (tp_thread_pool*)malloc(sizeof(tp_thread_pool));
memset(this, 0, sizeof(tp_thread_pool));
//init member function ponter
this->init = tp_init;
this->close = tp_close;
this->process_job = tp_process_job;
this->get_thread_by_id = tp_get_thread_by_id;
this->add_thread = tp_add_thread;
this->delete_thread = tp_delete_thread;
this->get_tp_status = tp_get_tp_status;
//init member var
this->min_th_num = min_num;
this->cur_th_num = this->min_th_num;
this->max_th_num = max_num;
pthread_mutex_init(&this->tp_lock, NULL);
//malloc mem for num thread info struct
if(NULL != this->thread_info)
free(this->thread_info);
this->thread_info = (tp_thread_info*)malloc(sizeof(tp_thread_info)*this->max_th_num);
return this;
}
/**
* member function reality. thread pool init function.
* para:
* this: thread pool struct instance ponter
* return:
* true: successful; false: failed
*/
TPBOOL tp_init(tp_thread_pool *this){
int i;
int err;
//creat work thread and init work thread info
for(i=0;i<this->min_th_num;i++){
pthread_cond_init(&this->thread_info[i].thread_cond, NULL);
pthread_mutex_init(&this->thread_info[i].thread_lock, NULL);
err = pthread_create(&this->thread_info[i].thread_id, NULL, tp_work_thread, this);
if(0 != err){
printf("tp_init: creat work thread failed\n");
return FALSE;
}
printf("tp_init: creat work thread %d\n", this->thread_info[i].thread_id);
}
//creat manage thread
err = pthread_create(&this->manage_thread_id, NULL, tp_manage_thread, this);
if(0 != err){
printf("tp_init: creat manage thread failed\n");
return FALSE;
}
printf("tp_init: creat manage thread %d\n", this->manage_thread_id);
return TRUE;
}
/**
* member function reality. thread pool entirely close function.
* para:
* this: thread pool struct instance ponter
* return:
*/
void tp_close(tp_thread_pool *this){
int i;
//close work thread
for(i=0;i<this->cur_th_num;i++){
kill(this->thread_info[i].thread_id, SIGKILL);
pthread_mutex_destroy(&this->thread_info[i].thread_lock);
pthread_cond_destroy(&this->thread_info[i].thread_cond);
printf("tp_close: kill work thread %d\n", this->thread_info[i].thread_id);
}
//close manage thread
kill(this->manage_thread_id, SIGKILL);
pthread_mutex_destroy(&this->tp_lock);
printf("tp_close: kill manage thread %d\n", this->manage_thread_id);
//free thread struct
free(this->thread_info);
}
/**
* member function reality. main interface opened.
* after getting own worker and job, user may use the function to process the task.
* para:
* this: thread pool struct instance ponter
* worker: user task reality.
* job: user task para
* return:
*/
void tp_process_job(tp_thread_pool *this, tp_work *worker, tp_work_desc *job){
int i;
int tmpid;
//fill this->thread_info's relative work key
for(i=0;i<this->cur_th_num;i++){
pthread_mutex_lock(&this->thread_info[i].thread_lock);
if(!this->thread_info[i].is_busy){
printf("tp_process_job: %d thread idle, thread id is %d\n", i, this->thread_info[i].thread_id);
//thread state be set busy before work
this->thread_info[i].is_busy = TRUE;
pthread_mutex_unlock(&this->thread_info[i].thread_lock);
this->thread_info[i].th_work = worker;
this->thread_info[i].th_job = job;
printf("tp_process_job: informing idle working thread %d, thread id is %d\n", i, this->thread_info[i].thread_id);
pthread_cond_signal(&this->thread_info[i].thread_cond);
return;
}
else
pthread_mutex_unlock(&this->thread_info[i].thread_lock);
}//end of for
//if all current thread are busy, new thread is created here
pthread_mutex_lock(&this->tp_lock);
if( this->add_thread(this) ){
i = this->cur_th_num - 1;
tmpid = this->thread_info[i].thread_id;
this->thread_info[i].th_work = worker;
this->thread_info[i].th_job = job;
}
pthread_mutex_unlock(&this->tp_lock);
//send cond to work thread
printf("tp_process_job: informing idle working thread %d, thread id is %d\n", i, this->thread_info[i].thread_id);
pthread_cond_signal(&this->thread_info[i].thread_cond);
return;
}
/**
* member function reality. get real thread by thread id num.
* para:
* this: thread pool struct instance ponter
* id: thread id num
* return:
* seq num in thread info struct array
*/
int tp_get_thread_by_id(tp_thread_pool *this, int id){
int i;
for(i=0;i<this->cur_th_num;i++){
if(id == this->thread_info[i].thread_id)
return i;
}
return -1;
}
/**
* member function reality. add new thread into the pool.
* para:
* this: thread pool struct instance ponter
* return:
* true: successful; false: failed
*/
static TPBOOL tp_add_thread(tp_thread_pool *this){
int err;
tp_thread_info *new_thread;
if( this->max_th_num <= this->cur_th_num )
return FALSE;
//malloc new thread info struct
new_thread = &this->thread_info[this->cur_th_num];
//init new thread's cond & mutex
pthread_cond_init(&new_thread->thread_cond, NULL);
pthread_mutex_init(&new_thread->thread_lock, NULL);
//init status is busy
new_thread->is_busy = TRUE;
//add current thread number in the pool.
this->cur_th_num++;
err = pthread_create(&new_thread->thread_id, NULL, tp_work_thread, this);
if(0 != err){
free(new_thread);
return FALSE;
}
printf("tp_add_thread: creat work thread %d\n", this->thread_info[this->cur_th_num-1].thread_id);
return TRUE;
}
/**
* member function reality. delete idle thread in the pool.
* only delete last idle thread in the pool.
* para:
* this: thread pool struct instance ponter
* return:
* true: successful; false: failed
*/
static TPBOOL tp_delete_thread(tp_thread_pool *this){
//current thread num can't < min thread num
if(this->cur_th_num <= this->min_th_num) return FALSE;
//if last thread is busy, do nothing
if(this->thread_info[this->cur_th_num-1].is_busy) return FALSE;
//kill the idle thread and free info struct
kill(this->thread_info[this->cur_th_num-1].thread_id, SIGKILL);
pthread_mutex_destroy(&this->thread_info[this->cur_th_num-1].thread_lock);
pthread_cond_destroy(&this->thread_info[this->cur_th_num-1].thread_cond);
//after deleting idle thread, current thread num -1
this->cur_th_num--;
return TRUE;
}
/**
* member function reality. get current thread pool status:idle, normal, busy, .etc.
* para:
* this: thread pool struct instance ponter
* return:
* 0: idle; 1: normal or busy(don't process)
*/
static int tp_get_tp_status(tp_thread_pool *this){
float busy_num = 0.0;
int i;
//get busy thread number
for(i=0;i<this->cur_th_num;i++){
if(this->thread_info[i].is_busy)
busy_num++;
}
//0.2? or other num?
if(busy_num/(this->cur_th_num) < BUSY_THRESHOLD)
return 0;//idle status
else
return 1;//busy or normal status
}
/**
* internal interface. real work thread.
* para:
* pthread: thread pool struct ponter
* return:
*/
static void *tp_work_thread(void *pthread){
pthread_t curid;//current thread id
int nseq;//current thread seq in the this->thread_info array
tp_thread_pool *this = (tp_thread_pool*)pthread;//main thread pool struct instance
//get current thread id
curid = pthread_self();
//get current thread's seq in the thread info struct array.
nseq = this->get_thread_by_id(this, curid);
if(nseq < 0)
return;
printf("entering working thread %d, thread id is %d\n", nseq, curid);
//wait cond for processing real job.
while( TRUE ){
pthread_mutex_lock(&this->thread_info[nseq].thread_lock);
pthread_cond_wait(&this->thread_info[nseq].thread_cond, &this->thread_info[nseq].thread_lock);
pthread_mutex_unlock(&this->thread_info[nseq].thread_lock);
printf("%d thread do work!\n", pthread_self());
tp_work *work = this->thread_info[nseq].th_work;
tp_work_desc *job = this->thread_info[nseq].th_job;
//process
work->process_job(work, job);
//thread state be set idle after work
pthread_mutex_lock(&this->thread_info[nseq].thread_lock);
this->thread_info[nseq].is_busy = FALSE;
pthread_mutex_unlock(&this->thread_info[nseq].thread_lock);
printf("%d do work over\n", pthread_self());
}
}
/**
* internal interface. manage thread pool to delete idle thread.
* para:
* pthread: thread pool struct ponter
* return:
*/
static void *tp_manage_thread(void *pthread){
tp_thread_pool *this = (tp_thread_pool*)pthread;//main thread pool struct instance
//1?
sleep(MANAGE_INTERVAL);
do{
if( this->get_tp_status(this) == 0 ){
do{
if( !this->delete_thread(this) )
break;
}while(TRUE);
}//end for if
//1?
sleep(MANAGE_INTERVAL);
}while(TRUE);
}
四. 數據庫銜接池引見
數據庫銜接是一種症結的無限的昂貴的資本,這一點在多用戶的網頁運用法式中表現得尤其凸起。
一個數據庫銜接對象均對應一個物理數據庫銜接,每次操作都翻開一個物理銜接,應用完都封閉銜接,如許形成體系的 機能低下。 數據庫銜接池的處理計劃是在運用法式啟動時樹立足夠的數據庫銜接,並講這些銜接構成一個銜接池(簡略說:在一個“池”裡放了很多多少半制品的數據庫聯接對象),由運用法式靜態地對池中的銜接停止請求、應用和釋放。關於多於銜接池中銜接數的並發要求,應當在要求隊列中列隊期待。而且運用法式可以依據池中銜接的應用率,靜態增長或削減池中的銜接數。
銜接池技巧盡量多地重用了消費內存地資本,年夜年夜節儉了內存,進步了辦事器地辦事效力,可以或許支撐更多的客戶辦事。經由過程應用銜接池,將年夜年夜進步法式運轉效力,同時,我們可以經由過程其本身的治理機制來監督數據庫銜接的數目、應用情形等。
1) 最小銜接數是銜接池一向堅持的數據庫銜接,所以假如運用法式對數據庫銜接的應用量不年夜,將會有年夜量的數據庫銜接資本被糟蹋;
2) 最年夜銜接數是銜接池能請求的最年夜銜接數,假如數據庫銜接要求跨越此數,前面的數據庫銜接要求將被參加到期待隊列中,這會影響以後的數據庫操作。