(代碼有參考網上的一些實現)還有幾個功能需要慢慢的實現和一些bug需要改,比如實現線程池的動態增長:
詳細請看我的github: https://github.com/chengshuguang/thread-pool
thread_pool.h
#ifndef THREAD_POOL_H #define THREAD_POOL_H #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <pthread.h> typedef struct task { void *(*taskfunc)(void *arg);//聲明一個函數指針 void *arg;//函數的參數 struct task *next; }task; typedef struct thread_pool { task *task_queue_head;//任務隊列 task *task_queue_end;//指向任務隊列結尾 int task_queue_size; pthread_t *thread_queue;//線程隊列 int thread_num; int idle_thread_num;//空閒線程數 int is_pool_destroyed; pthread_mutex_t queue_mutex;//用來互斥訪問任務隊列 pthread_cond_t queue_cond; }thread_pool; #ifdef __cplusplus extern "C"{ #endif extern thread_pool *pool; extern int thread_pool_init(int thread_pool_size); //extern void * thread_pool_entrance(void *arg); extern int thread_pool_add_task(void *(*taskfunc)(void *arg), void *arg); extern int thread_pool_destroy(); #ifdef __cplusplus } #endif #endif //THREAD_POOL_H
thread_pool.c
#include "thread_pool.h" #include <pthread.h> thread_pool *pool = NULL; void * thread_pool_entrance(void *arg) { int thread_id = (int)arg; printf("thread %d is created\n",thread_id); while(1) { pthread_mutex_lock(&(pool->queue_mutex)); while(pool->task_queue_size == 0 && !pool->is_pool_destroyed)//必須用while,防止假喚醒 { pthread_cond_wait(&(pool->queue_cond),&(pool->queue_mutex));//等待的時候會解鎖,喚醒後加鎖 } if(pool->is_pool_destroyed) { printf("thread %d exit!!!\n",thread_id); pthread_mutex_unlock(&(pool->queue_mutex));//中途退出最容易出錯,注意要解鎖 pthread_exit(NULL); } pool->idle_thread_num--;//線程進入忙碌狀態 //從任務隊列中取出任務 task *work; work = pool->task_queue_head; pool->task_queue_head = pool->task_queue_head->next; if(pool->task_queue_head == NULL) pool->task_queue_end = NULL; pool->task_queue_size--; pthread_mutex_unlock(&(pool->queue_mutex)); //回調函數 (*(work->taskfunc))(work->arg); pool->idle_thread_num++;//線程空閒 } return NULL; } int thread_pool_init(int thread_pool_size) { pool = (thread_pool *)malloc(sizeof(thread_pool));//不要最先給線程池分配空間 pool->is_pool_destroyed = 0; pool->task_queue_head = NULL; pool->task_queue_end = NULL; pool->task_queue_size = 0; pool->thread_num = thread_pool_size; pool->thread_queue = (pthread_t *)malloc(thread_pool_size * sizeof(pthread_t)); pool->idle_thread_num = thread_pool_size; //創建線程 int i, ret; for(i=0; i<thread_pool_size; i++) { ret = pthread_create(&(pool->thread_queue[i]), NULL, thread_pool_entrance, (void *)i); if(ret < 0) { printf("thread create error!!!\n"); thread_pool_destroy();//注意銷毀,避免內存洩漏 return -1; } } pthread_mutex_init(&(pool->queue_mutex), NULL); pthread_cond_init(&(pool->queue_cond), NULL); return 0; } typedef void *(*taskfunc)(void *arg); int thread_pool_add_task(taskfunc func, void *arg) { task *newtask; newtask = (task *)malloc(sizeof(task)); newtask->taskfunc = func; newtask->arg = arg; newtask->next = NULL; pthread_mutex_lock(&(pool->queue_mutex)); if(pool->task_queue_head == NULL) { pool->task_queue_head = pool->task_queue_end = newtask; } else { pool->task_queue_end = pool->task_queue_end->next = newtask; } pool->task_queue_size++; pthread_cond_signal(&(pool->queue_cond)); pthread_mutex_unlock(&(pool->queue_mutex)); return 0; } int thread_pool_destroy() { if(pool->is_pool_destroyed)//防止多次銷毀 return -1; pool->is_pool_destroyed = 1; pthread_cond_broadcast(&(pool->queue_cond));//通知所有線程線程池銷毀了 int i; for(i=0; i<pool->thread_num; i++)//等待線程全部執行完 pthread_join(pool->thread_queue[i], NULL); //銷毀任務隊列 task *temp = NULL; while(pool->task_queue_head) { temp = pool->task_queue_head; pool->task_queue_head = pool->task_queue_head->next; free(temp); } //pool->task_queue_head = NULL; //pool->task_queue_end = NULL; //銷毀線程隊列 free(pool->thread_queue); pool->thread_queue = NULL; pthread_mutex_destroy(&(pool->queue_mutex)); pthread_cond_destroy(&(pool->queue_cond)); free(pool); pool = NULL; return 0; }
#include "thread_pool.h" #include <stdio.h> void *taskprocess(void *arg) { printf("aaaaaadoing tasksaaaaaaaaa\n"); usleep(1000); return NULL; } int main() { thread_pool_init(5); int i; for(i=1; i<=10; i++) { thread_pool_add_task(taskprocess,(void *)i); usleep(1000); } sleep(1); thread_pool_destroy(); return 0; }