幾個基本的線程函數:
線程操縱函數
創建: int pthread_create(pthread_t *tidp, const pthread_attr_t *attr, (void*)(*start_rtn)(void *), void *arg);
終止自身 void pthread_exit(void *retval);
終止其他: int pthread_cancel(pthread_t tid); 發送終止信號後目標線程不一定終止,要調用join函數等待
阻塞並等待其他線程:int pthread_join(pthread_t tid, void **retval);
屬性
初始化: int pthread_attr_init(pthread_attr_t *attr);
設置分離狀態: pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate)
銷毀屬性: int pthread_attr_destroy(pthread_attr_t *attr);
同步函數
互斥鎖
初始化: int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
銷毀鎖: int pthread_mutex_destroy(pthread_mutex_t *mutex);
加鎖: pthread_mutex_lock(pthread_mutex_t *mutex);
嘗試加鎖: int pthread_mutex_trylock(pthread_mutex_t *mutex);
解鎖: int pthread_mutex_unlock(pthread_mutex_t *mutex);
條件變量
初始化:int pthread_cond_init(pthread_cond_t *cv, const pthread_condattr_t *cattr);
銷毀: int pthread_cond_destroy(pthread_cond_t *cond);
等待條件: pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
通知: pthread_cond_signal(pthread_cond_t *cond); 喚醒第一個調用pthread_cond_wait()而進入睡眠的線程
工具函數
比較線程ID: int pthread_equal(pthread_t t1, pthread_t t2);
分離線程: pthread_detach(pthread_t tid);
自身ID: pthread_t pthread_self(void);
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <pthread.h> //linux環境中多線程的頭文件,非C語言標准庫,編譯時最後要加 -lpthread 調用動態鏈接庫 4 5 //工作鏈表的結構 6 typedef struct worker { 7 void *(*process) (void *arg); //工作函數 8 void *arg; //函數的參數 9 struct worker *next; 10 }CThread_worker; 11 12 //線程池的結構 13 typedef struct { 14 pthread_mutex_t queue_lock; //互斥鎖 15 pthread_cond_t queue_ready; //條件變量/信號量 16 17 CThread_worker *queue_head; //指向工作鏈表的頭結點,臨界區 18 int cur_queue_size; //記錄鏈表中工作的數量,臨界區 19 20 int max_thread_num; //最大線程數 21 pthread_t *threadid; //線程ID 22 23 int shutdown; //開關 24 }CThread_pool; 25 26 static CThread_pool *pool = NULL; //一個線程池變量 27 int pool_add_worker(void *(*process)(void *arg), void *arg); //負責向工作鏈表中添加工作 28 void *thread_routine(void *arg); //線程例程 29 30 //線程池初始化 31 void 32 pool_init(int max_thread_num) 33 { 34 int i = 0; 35 36 pool = (CThread_pool *) malloc (sizeof (CThread_pool)); //創建線程池 37 38 pthread_mutex_init(&(pool->queue_lock), NULL); //互斥鎖初始化,參數為鎖的地址 39 pthread_cond_init( &(pool->queue_ready), NULL); //條件變量初始化,參數為變量地址 40 41 pool->queue_head = NULL; 42 pool->cur_queue_size = 0; 43 44 pool->max_thread_num = max_thread_num; 45 pool->threadid = (pthread_t *) malloc(max_thread_num * sizeof(pthread_t)); 46 for (i = 0; i < max_thread_num; i++) { 47 pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL); //創建線程, 參數為線程ID變量地址、屬性、例程、參數 48 } 49 50 pool->shutdown = 0; 51 } 52 53 //例程,調用具體的工作函數 54 void * 55 thread_routine(void *arg) 56 { 57 printf("starting thread 0x%x\n", (int)pthread_self()); 58 while(1) { 59 pthread_mutex_lock(&(pool->queue_lock)); //從工作鏈表中取工作,要先加互斥鎖,參數為鎖地址 60 61 while(pool->cur_queue_size == 0 && !pool->shutdown) { //鏈表為空 62 printf("thread 0x%x is waiting\n", (int)pthread_self()); 63 pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); //等待資源,信號量用於通知。會釋放第二個參數的鎖,以供添加;函數返回時重新加鎖。 64 } 65 66 if(pool->shutdown) { 67 pthread_mutex_unlock(&(pool->queue_lock)); //結束開關開啟,釋放鎖並退出線程 68 printf("thread 0x%x will exit\n", (int)pthread_self()); 69 pthread_exit(NULL); //參數為void * 70 } 71 72 printf("thread 0x%x is starting to work\n", (int)pthread_self()); 73 74 --pool->cur_queue_size; 75 CThread_worker *worker = pool->queue_head; 76 pool->queue_head = worker->next; 77 78 pthread_mutex_unlock (&(pool->queue_lock)); //獲取一個工作後釋放鎖 79 80 81 (*(worker->process))(worker->arg); //做工作 82 free(worker); 83 worker = NULL; 84 } 85 86 pthread_exit(NULL); 87 } 88 89 //銷毀線程池 90 int 91 pool_destroy() 92 { 93 if(pool->shutdown) //檢測結束開關是否開啟,若開啟,則所有線程會自動退出 94 return -1; 95 pool->shutdown = 1; 96 97 pthread_cond_broadcast( &(pool->queue_ready) ); //廣播,喚醒所有線程,准備退出 98 99 int i; 100 for(i = 0; i < pool->max_thread_num; ++i) 101 pthread_join(pool->threadid[i], NULL); //主線程等待所有線程退出,只有join第一個參數不是指針,第二個參數類型是void **,接收exit的返回值,需要強制轉換 102 free(pool->threadid); 103 104 CThread_worker *head = NULL; 105 while(pool->queue_head != NULL) { //釋放未執行的工作鏈表剩余結點 106 head = pool->queue_head; 107 pool->queue_head = pool->queue_head->next; 108 free(head); 109 } 110 111 pthread_mutex_destroy(&(pool->queue_lock)); //銷毀鎖和條件變量 112 pthread_cond_destroy(&(pool->queue_ready)); 113 114 free(pool); 115 pool=NULL; 116 return 0; 117 } 118 119 void * 120 myprocess(void *arg) 121 { 122 printf("threadid is 0x%x, working on task %d\n", (int)pthread_self(), *(int*)arg); 123 sleep (1); 124 return NULL; 125 } 126 127 //添加工作 128 int 129 pool_add_worker(void *(*process)(void *arg), void *arg) 130 { 131 CThread_worker *newworker = (CThread_worker *) malloc(sizeof(CThread_worker)); 132 newworker->process = process; //具體的工作函數 133 newworker->arg = arg; 134 newworker->next = NULL; 135 136 pthread_mutex_lock( &(pool->queue_lock) ); //加鎖 137 138 CThread_worker *member = pool->queue_head; //插入鏈表尾部 139 if( member != NULL ) { 140 while( member->next != NULL ) 141 member = member->next; 142 member->next = newworker; 143 } 144 else { 145 pool->queue_head = newworker; 146 } 147 ++pool->cur_queue_size; 148 149 pthread_mutex_unlock( &(pool->queue_lock) ); //解鎖 150 151 pthread_cond_signal( &(pool->queue_ready) ); //通知一個等待的線程 152 return 0; 153 } 154 155 int 156 main(int argc, char **argv) 157 { 158 pool_init(3); //主線程創建線程池,3個線程 159 160 int *workingnum = (int *) malloc(sizeof(int) * 10); 161 int i; 162 for(i = 0; i < 10; ++i) { 163 workingnum[i] = i; 164 pool_add_worker(myprocess, &workingnum[i]); //主線程負責添加工作,10個工作 165 } 166 167 sleep (5); 168 pool_destroy(); //銷毀線程池 169 free (workingnum); 170 171 return 0; 172 }
最簡單的可以利用java.util.concurrent.Executors
調用Executors.newCachedThreadPool()獲取緩沖式線程池
Executors.newFixedThreadPool(int nThreads)獲取固定大小的線程池
其中Resin從V3.0後需要購買才能用於商業目的,而其他兩種則是純開源的。可以分別從他們的網站上下載最新的二進制包和源代碼。
作為Web容器,需要承受較高的訪問量,能夠同時響應不同用戶的請求,能夠在惡劣環境下保持較高的穩定性和健壯性。在HTTP服務器領域,ApacheHTTPD的效率是最高的,也是最為穩定的,但它只能處理靜態頁面的請求,如果需要支持動態頁面請求,則必須安裝相應的插件,比如mod_perl可以處理Perl腳本,mod_python可以處理Python腳本。
上面介紹的三中Web容器,都是使用Java編寫的HTTP服務器,當然他們都可以嵌到Apache中使用,也可以獨立使用。分析它們處理客戶請求的方法有助於了解Java多線程和線程池的實現方法,為設計強大的多線程服務器打好基礎。
Tomcat是使用最廣的Java Web容器,功能強大,可擴展性強。最新版本的Tomcat(5.5.17)為了提高響應速度和效率,使用了Apache Portable Runtime(APR)作為最底層,使用了APR中包含Socket、緩沖池等多種技術,性能也提高了。APR也是Apache HTTPD的最底層。可想而知,同屬於ASF(Apache Software Foundation)中的成員,互補互用的情況還是很多的,雖然使用了不同的開發語言。
Tomcat 的線程池位於tomcat-util.jar文件中,包含了兩種線程池方案。方案一:使用APR的Pool技術,使用了JNI;方案二:使用Java實現的ThreadPool。這裡介紹的是第二種。如果想了解APR的Pool技術,可以查看APR的源代碼。
ThreadPool默認創建了5個線程,保存在一個200維的線程數組中,創建時就啟動了這些線程,當然在沒有請求時,它們都處理等待狀態(其實就是一個while循環,不停的等待notify)。如果有請求時,空閒線程會被喚醒執行用戶的請求。
具體的請求過程是:服務啟動時,創建一個一維線程數組(maxThread=200個),並創建空閒線程(minSpareThreads=5個)隨時等待用戶請求。當有用戶請求時,調用 threadpool.runIt(ThreadPoolRunnable)方法,將一個需要執行的實例傳給ThreadPool中。其中用戶需要執行的實例必須實現ThreadPoolRunnable接口。 ThreadPool首先查找空閒的線程,如果有則用它運行要執行ThreadPoolRunnable;如果沒有空閒線程並且沒有超過 maxThreads,就一次性創建minSpareThreads個空閒線程;如果已經超過了maxThreads了,就等待空閒線程了。總之,要找到空閒的線程,以便用它執行實例。找到後,將該線程從線程數組中移走。接著喚醒已經找到的空閒線程,用它運行執行實例(ThreadPoolRunnable)。運行完ThreadPoolRunnable後,就將該線程重新放到線程數組中,作為空閒線程供後續使用。
由此可以看出,Tomcat的線程池實現是比較簡單的,ThreadPool.java也只有840行代碼。用一個一維數組保存空閒的線程,每次以一個較小步伐(5個)創建空閒線程並放到線程池中。使用時從數組中移走空閒的線程,用完後,再歸還給線程池。