線程池實現:
用於執行大量相對短暫的任務
當任務增加的時候能夠動態的增加線程池中線程的數量直到達到一個阈值。
當任務執行完畢的時候,能夠動態的銷毀線程池中的線程
該線程池的實現本質上也是生產者與消費模型的應用。生產者線程向任務隊列中添加任務,一旦隊列有任務到來,如果有等待線程就喚醒來執行任務,如果沒有等待線程並且線程數沒有達到阈值,就創建新線程來執行任務。
contion.h
#ifndef _CONDITION_H_
#define _CONDITION_H_
#include
typedef struct condition
{
pthread_mutex_t pmutex;
pthread_cond_t pcond;
} condition_t;
int condition_init(condition_t *cond);
int condition_lock(condition_t *cond);
int condition_unlock(condition_t *cond);
int condition_wait(condition_t *cond);
int condition_timedwait(condition_t *cond, const struct timespec *abstime);
int condition_signal(condition_t *cond);
int condition_broadcast(condition_t *cond);
int condition_destroy(condition_t *cond);
#endif /* _CONDITION_H_ */
condition.c
#include "condition.h"
int condition_init(condition_t *cond)
{
int status;
if ((status = pthread_mutex_init(&cond->pmutex, NULL)))
return status;
if ((status = pthread_cond_init(&cond->pcond, NULL)))
return status;
return 0;
}
int condition_lock(condition_t *cond)
{
return pthread_mutex_lock(&cond->pmutex);
}
int condition_unlock(condition_t *cond)
{
return pthread_mutex_unlock(&cond->pmutex);
}
int condition_wait(condition_t *cond)
{
return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}
int condition_timedwait(condition_t *cond, const struct timespec *abstime)
{
return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}
int condition_signal(condition_t *cond)
{
return pthread_cond_signal(&cond->pcond);
}
int condition_broadcast(condition_t* cond)
{
return pthread_cond_broadcast(&cond->pcond);
}
int condition_destroy(condition_t* cond)
{
int status;
if ((status = pthread_mutex_destroy(&cond->pmutex)))
return status;
if ((status = pthread_cond_destroy(&cond->pcond)))
return status;
return 0;
}
threadpool.h
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include "condition.h"
// 任務結構體,將任務放入隊列由線程池中的線程來執行
typedef struct task
{
void *(*run)(void *arg);
// 任務回調函數
void *arg;
// 回調函數參數
struct task *next;
} task_t;
// 線程池結構體
typedef struct threadpool
{
condition_t ready;
//任務准備就緒或者線程池銷毀通知
task_t *first;
//任務隊列頭指針
task_t *last;
//任務隊列尾指針
int counter;
//線程池中當前線程數
int idle;
//線程池中當前正在等待任務的線程數
int max_threads;
//線程池中最大允許的線程數
int quit;
//銷毀線程池的時候置1
} threadpool_t;
// 初始化線程池
void threadpool_init(threadpool_t *pool, int threads);
// 往線程池中添加任務
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);
// 銷毀線程池
void threadpool_destroy(threadpool_t *pool);
#endif /* _THREAD_POOL_H_ */
threadpool.c
#include "threadpool.h"
#include
#include
#include
#include
#include
void *thread_routine(void *arg)
{
struct timespec abstime;
int timeout;
printf("thread 0x%x is starting\n", (int)pthread_self());
threadpool_t *pool = (threadpool_t *)arg;
while (1)
{
timeout = 0;
condition_lock(&pool->ready);
pool->idle++;
// 等待隊列有任務到來或者線程池銷毀通知
while (pool->first == NULL && !pool->quit)
{
printf("thread 0x%x is waiting\n", (int)pthread_self());
//condition_wait(&pool->ready);
clock_gettime(CLOCK_REALTIME, &abstime);
abstime.tv_sec += 2;
int status = condition_timedwait(&pool->ready, &abstime);
if (status == ETIMEDOUT)
{
printf("thread 0x%x is wait timed out\n", (int)pthread_self());
timeout = 1;
break;
}
}
// 等待到條件,處於工作狀態
pool->idle--;
// 等待到任務
if (pool->first != NULL)
{
// 從隊頭取出任務
task_t *t = pool->first;
pool->first = t->next;
// 執行任務需要一定的時間,所以要先解鎖,以便生產者進程
// 能夠往隊列中添加任務,其它消費者線程能夠進入等待任務
condition_unlock(&pool->ready);
t->run(t->arg);
free(t);
condition_lock(&pool->ready);
}
// 如果等待到線程池銷毀通知, 且任務都執行完畢
if (pool->quit && pool->first == NULL)
{
pool->counter--;
if (pool->counter == 0)
condition_signal(&pool->ready);
condition_unlock(&pool->ready);
// 跳出循環之前要記得解鎖
break;
}
if (timeout && pool->first == NULL)
{
pool->counter--;
condition_unlock(&pool->ready);
// 跳出循環之前要記得解鎖
break;
}
condition_unlock(&pool->ready);
}
printf("thread 0x%x is exting\n", (int)pthread_self());
return NULL;
}
// 初始化線程池
void threadpool_init(threadpool_t *pool, int threads)
{
// 對線程池中的各個字段初始化
condition_init(&pool->ready);
pool->first = NULL;
pool->last = NULL;
pool->counter = 0;
pool->idle = 0;
pool->max_threads = threads;
pool->quit = 0;
}
// 往線程池中添加任務
void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg)
{
// 生成新任務
task_t *newtask = (task_t *)malloc(sizeof(task_t));
newtask->run = run;
newtask->arg = arg;
newtask->next = NULL;
condition_lock(&pool->ready);
// 將任務添加到隊列
if (pool->first == NULL)
pool->first = newtask;
else
pool->last->next = newtask;
pool->last = newtask;
// 如果有等待線程,則喚醒其中一個
if (pool->idle > 0)
condition_signal(&pool->ready);
else if (pool->counter < pool->max_threads)
{
// 沒有等待線程,並且當前線程數不超過最大線程數,則創建一個新線程
pthread_t tid;
pthread_create(&tid, NULL, thread_routine, pool);
pool->counter++;
}
condition_unlock(&pool->ready);
}
// 銷毀線程池
void threadpool_destroy(threadpool_t *pool)
{
if (pool->quit)
{
return;
}
condition_lock(&pool->ready);
pool->quit = 1;
if (pool->counter > 0)
{
if (pool->idle > 0)
condition_broadcast(&pool->ready);
// 處於執行任務狀態中的線程,不會收到廣播
// 線程池需要等待執行任務狀態中的線程全部退出
while (pool->counter > 0)
condition_wait(&pool->ready);
}
condition_unlock(&pool->ready);
condition_destroy(&pool->ready);
}
main.c
#include "threadpool.h"
#include
#include
#include
void* mytask(void *arg)
{
printf("thread 0x%x is working on task %d\n", (int)pthread_self(), *(int*)arg);
sleep(1);
free(arg);
return NULL;
}
int main(void)
{
threadpool_t pool;
threadpool_init(&pool, 3);
int i;
for (i=0; i<10; i++)
{
int *arg = (int *)malloc(sizeof(int));
*arg = i;
threadpool_add_task(&pool, mytask, arg);
}
//sleep(15);
threadpool_destroy(&pool);
return 0;
}
makefile:
.PHONY:clean
CC=gcc
CFLAGS=-Wall -g
ALL=main
all:$(ALL)
OBJS=threadpool.o main.o condition.o
.c.o:
$(CC) $(CFLAGS) -c $<
main:$(OBJS)
$(CC) $(CFLAGS) $^ -o $@ -lpthread -lrt
clean:
rm -f $(ALL) *.o