.
.
.
.
.
目錄
(一) 一起學 Unix 環境高級編程 (APUE) 之 標准IO
(二) 一起學 Unix 環境高級編程 (APUE) 之 文件 IO
(三) 一起學 Unix 環境高級編程 (APUE) 之 文件和目錄
(四) 一起學 Unix 環境高級編程 (APUE) 之 系統數據文件和信息
(五) 一起學 Unix 環境高級編程 (APUE) 之 進程環境
(六) 一起學 Unix 環境高級編程 (APUE) 之 進程控制
(七) 一起學 Unix 環境高級編程 (APUE) 之 進程關系 和 守護進程
(八) 一起學 Unix 環境高級編程 (APUE) 之 信號
(九) 一起學 Unix 環境高級編程 (APUE) 之 線程
(十) 一起學 Unix 環境高級編程 (APUE) 之 線程控制
本章主要講線程的基本操作,線程的創建、取消、終止、同步等等。
實際項目中多線程用得比較多,因為多線程是先有標准後有實現的,所以不會向多進程那樣在不同平台上有許多不同的情況。
線程就是一個正在運行的函數。
C 語言線程有很多標准,POSIX 是其中的一種。
POSIX 是一套標准,而不是一種實現。
正因為 POSIX 是一套標准而不是實現,所以 POSIX 只是規定了 pthread_t 作為線程標識符,但是並沒有規定它必須是由什麼類型組成的,所以在有的平台上它可能是 int,有些平台上它可能是 struct,還有些平台上它可能是 union,所以不要直接操作這個類型,而是要使用 POSIX 規定的各種線程函數來操作它。
有木有覺得像標准 IO 裡 FILE 的趕腳?沒錯,標准制定出來的很多東西都是這種風格的,它為你提供一個數據類型而不讓你直接對這個類型操作,要通過它定義的一系列函數來實現對這個類型的操作,這樣就在各個平台上實現統一的接口了,所以這樣做才能讓標准制定出來的東西具有較好的可移植性。
pthread_t 是個很重要的東西,我們所有使用 PSOIX 標准的線程操作都是圍繞著它來進行的,通過它配合各種函數就可以對線程進行各種花樣作死的玩了。:)
我記得在前面進程的博文中 LZ 介紹過幾種 ps(1) 命令的使用方式,用來觀察進程的關系和狀態。在本篇開始之前 LZ 再補充一個 ps(1) 命令的組合,用來查看線程的情況,方便我們調試程序。
>$ ps ax -L PID LWP TTY STAT TIME COMMAND 1 1 ? Ss 0:02 /sbin/init 2 2 ? S 0:00 [kthreadd] 3 3 ? S 0:00 [ksoftirqd/0] 877 877 ? Ss 0:06 dbus-daemon --system --fork 948 948 ? Ssl 0:00 /usr/sbin/ModemManager 948 965 ? Ssl 0:00 /usr/sbin/ModemManager 948 975 ? Ssl 0:00 /usr/sbin/ModemManager 956 956 ? Ss 0:00 /usr/sbin/bluetoothd >$
PID 是進程號,LWP 是線程 ID。
這裡看到的 PID 為 948 的進程有三個 LWP,它們就是三個線程。
1. pthread_equal(3)
1 pthread_equal - compare thread IDs 2 3 #include <pthread.h> 4 5 int pthread_equal(pthread_t t1, pthread_t t2); 6 7 Compile and link with -pthread.
第一個要介紹的函數是 pthread_equal(3),比較兩個線程標識符是否相同。為什麼不能使用 if (t1 == t2) 的方式比較兩個線程標識符呢?就像我們上面說的,因為你不知道 pthread_t 是什麼類型的,所以永遠不要自己直接操作它。
2. pthread_self(3)
1 pthread_self - obtain ID of the calling thread 2 3 #include <pthread.h> 4 5 pthread_t pthread_self(void); 6 7 Compile and link with -pthread.
大家還記得一個進程可以通過 getpid(2) 函數獲得當前進程的 ID 號吧?pthread_self(3) 就是獲得當前線程 ID 的函數。
3. pthread_create(3)
1 pthread_create - create a new thread 2 3 #include <pthread.h> 4 5 int pthread_create(pthread_t *thread, const pthread_attr_t *attr, 6 void *(*start_routine) (void *), void *arg); 7 8 Compile and link with -pthread.
pthread_create(3) 函數的作用就是創建一個新線程。
參數列表:
thread:由函數回填的線程標識符,它來唯一的標識產生的新線程,後面我們只要需要操作新線程就需要用到它;
attr:線程屬性,在本篇博文(第 11 章)中,所有的屬性都是使用 NULL,也就是使用默認屬性。
start_routine:線程的執行函數;入參是 void*,返回值是 void*,恭喜你,這兩個值的類型都是百搭的,任何類型你都可以在這使用了。
arg:傳遞給 start_routine 的 void* 參數。
返回值:成功返回 0;失敗返回 errno。為什麼線程函數返回的是 errno 呢?因為在一些平台上 error 是全局變量,如果大家都使用同一個全局變量,在多線程的情況下就可能會出現競爭,所以 POSIX 的線程函數一般在失敗的時候都是直接返回 errno 的,這樣就避免了某些平台 errno 的缺陷了。
新線程和當前的線程是兩個兄弟線程,他們是平等的,沒有父子關系。
新線程被創建之後,這兩個線程哪個先執行是不確定的,由調度器來決定。如果你希望哪個線程一定先執行,那麼就在其它線程中使用類似 sleep(3) 的函數讓它們等一會兒再運行。
4. pthread_exit(3)
1 pthread_exit - terminate calling thread 2 3 #include <pthread.h> 4 5 void pthread_exit(void *retval); 6 7 Compile and link with -pthread.
在線程執行函數中調用,作用是退出當前線程,並將返回值通過 retval 參數返回給調用 pthread_join(3) 函數的地方,如果不需要返回值可以傳入 NULL。
pthread_join(3) 是為線程收屍的函數,我們會在下面詳細介紹。
我們先來看個小栗子,直觀的了解下線程是如何被創建的,以及它是如何工作的。
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <pthread.h> 4 #include <string.h> 5 #include <unistd.h> 6 7 static void *func(void *p) 8 { 9 puts("Thread is working."); 10 11 sleep(10); // 延時是為了方便我們使用 ps(1) 命令驗證線程是否被創建了 12 13 pthread_exit(NULL); 14 // return NULL; 15 } 16 17 int main() 18 { 19 pthread_t tid; 20 int err; 21 22 puts("Begin!"); 23 24 // 創建線程 25 err = pthread_create(&tid,NULL,func,NULL); 26 if(err) 27 { 28 fprintf(stderr,"pthread_create():%s\n",strerror(err)); 29 exit(1); 30 } 31 32 // 為線程收屍 33 pthread_join(tid,NULL); 34 35 puts("End!"); 36 37 exit(0); 38 }
編譯並運行,同時使用 LZ 上面介紹過的 ps(1) 命令驗證線程是否被創建成功了。
>$ gcc -Wall create.c -o create -pthread >$ ./create Begin! Thread is working. End! >$ # 在線程結束之前打開另一個終端,驗證線程的狀態 >$ ps ax -L PID LWP TTY STAT TIME COMMAND 4354 4354 pts/1 Sl+ 0:00 ./create 4354 4355 pts/1 Sl+ 0:00 ./create >$
通過 ps(1) 命令的驗證,可以看到這兩個線程擁有同一個 PID 不同的 LWP,所以可以直觀的看出來我們的線程創建成功了!
大家注意,編譯 POSIX 線程程序的時候需要使用 -pthread 參數,這個其實在 man 手冊裡已經說得很清楚了,但是還是總有童鞋在編譯的時候沒有加這個參數然後還問我為什麼編譯不通過,鄙視你們這幫粗心的家伙。。
5. pthread_cancel(3)
1 pthread_cancel - send a cancellation request to a thread 2 3 #include <pthread.h> 4 5 int pthread_cancel(pthread_t thread); 6 7 Compile and link with -pthread.
pthread_cancel(3) 函數的作用是取消同一個進程中的其它線程線程。
為什麼要取消線程呢?當一個線程沒有必要繼續執行下去時,我們又沒法為它收屍,所以就需要先取消這個線程,然後再為它收屍。
比如在使用多線程遍歷一個很大的二叉樹查找一個數據時,其中某一個線程找到了要查找的數據,那麼其它線程就沒有必要繼續執行了,所以就可以取消它們了。
注意 pthread_cancel(3) 並不等待線程終止,它僅僅提出請求。
而線程收到這個請求也不會立即終止,線程要執行到取消點才能被取消,關於取消點在下一篇博文中會介紹。
6. pthread_join(3)
1 pthread_join - join with a terminated thread 2 3 #include <pthread.h> 4 5 int pthread_join(pthread_t thread, void **retval); 6 7 Compile and link with -pthread.
為線程收屍,在上面的栗子中大家已經見到了。不像 wait(2) 函數,線程之間誰都可以為別人收屍,它們之間是沒有父子關系的。而 wait(2) 函數只能是由父進程對子進程收屍。
參數列表:
thread:指定為哪個線程收屍;
retval:這個二級指針是什麼呢?它就是線程在退出的時候的返回值(pthread_exit(3) 的參數),它會把線程的返回值的地址回填到這個參數中。
7.線程清理處理程序(thread cleanup handler)
1 pthread_cleanup_push, pthread_cleanup_pop - push and pop thread cancellation clean-up handlers 2 3 #include <pthread.h> 4 5 void pthread_cleanup_push(void (*routine)(void *), void *arg); 6 void pthread_cleanup_pop(int execute); 7 8 Compile and link with -pthread.
就像在進程級別使用 atexit(3) 函數掛鉤子函數一樣,線程可能也需要在結束時執行一些清理工作,這時候就需要派出線程清理處理程序上場了。鉤子函數的調用順序也是逆序的,也就是執行順序與注冊順序相反。
這兩個是帶參的宏而不是函數,所以必須成對使用,而且必須先使用 pthread_cleanup_push 再使用 pthread_cleanup_pop,否則會報語法錯誤,括號不匹配。
參數列表:
routine:鉤子函數。
arg:傳遞給鉤子函數的參數。
execute:0 不調用該鉤子函數;1 調用該鉤子函數。
pthread_cleanup_pop 寫到哪都行,只要寫了讓語法不報錯就行,就算你把它寫到 pthread_exit(3) 下面也沒問題,但是 execute 參數就看不到了,所以無論 pthread_cleanup_pop 的參數是什麼,所有注冊過的鉤子函數都會被執行。
1 #include <pthread.h> 2 3 void routine (void *p) {} 4 5 void* fun (void *p) 6 { 7 pthread_cleanup_push(routine, NULL); 8 這裡是其它代碼 9 pthread_cleanup_pop(1); 10 }
預編譯,查看宏替換的結果:
>$ gcc -E cleanup.c void routine (void *p) {} void* fun (void *p) { do { __pthread_unwind_buf_t __cancel_buf; void (*__cancel_routine) (void *) = (routine); void *__cancel_arg = (((void *)0)); int not_first_call = __sigsetjmp ((struct __jmp_buf_tag *) (void *) __cancel_buf.__cancel_jmp_buf, 0); if (__builtin_expect (not_first_call, 0)) { __cancel_routine (__cancel_arg); __pthread_unwind_next (&__cancel_buf); } __pthread_register_cancel (&__cancel_buf); do {; 這裡是其它代碼 do { } while (0); } while (0); __pthread_unregister_cancel (&__cancel_buf); if (1) __cancel_routine (__cancel_arg); } while (0); }
通過預編譯可以看出來 pthread_cleanup_push 和 pthread_cleanup_pop 兩個宏被替換了,並且每個宏僅定義了一半,如果不成對寫另一個宏編譯的時候就會報括號不匹配的錯誤。
8. pthread_detach(3)
1 pthread_detach - detach a thread 2 3 #include <pthread.h> 4 5 int pthread_detach(pthread_t thread); 6 7 Compile and link with -pthread.
pthread_detach(3) 函數用於分離線程,被分離的線程是不能被收屍的。
9.互斥量(pthead_mutex_t)
多線程就是為了充分利用硬件資源,使程序可以並發的運行,但是只要是並發就會遇到競爭的問題,互斥量就是解決競爭的多種手段之一。
在介紹互斥量之前我們先思考一個問題:如何讓 20 個線程同時從一個文件中讀取數字,累加 1 然後再寫入回去,並且保證程序運行之後文件中的數值比運行程序之前大 20?
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <fcntl.h> 4 #include <unistd.h> 5 #include <pthread.h> 6 #include <string.h> 7 8 #include <sys/types.h> 9 #include <sys/stat.h> 10 11 #define BUFSIZE 32 12 13 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 14 15 static void *fun (void *p) 16 { 17 int fd = -1; 18 long long n = 0; 19 char buf[BUFSIZE] = ""; 20 21 fd = open(p, O_RDWR | O_CREAT, 0664); 22 /* if err */ 23 24 pthread_mutex_lock(&mutex); 25 26 read(fd, buf, BUFSIZE); 27 lseek(fd, 0, SEEK_SET); 28 n = atoll(buf); 29 snprintf(buf, BUFSIZE, "%lld\n", ++n); 30 write(fd, buf, strlen(buf)); 31 32 close(fd); 33 34 pthread_mutex_unlock(&mutex); 35 36 pthread_exit(NULL); 37 } 38 39 int main (int argc, char **argv) 40 { 41 pthread_t tids[20]; 42 int i = 0; 43 44 if (argc < 2) { 45 fprintf(stderr, "Usage %s <filename>\n", argv[0]); 46 return -1; 47 } 48 49 for (i = 0; i < 20; i++) { 50 pthread_create(&tids[i], NULL, fun, argv[1]); 51 /* if err */ 52 } 53 54 for (i = 0; i < 20; i++) { 55 pthread_join(tids[i], NULL); 56 } 57 58 pthread_mutex_destroy(&mutex); 59 60 return 0; 61 }
程序中每一個線程都要做:讀取文件 --- 累加 1 --- 寫入文件 的動作,如果 20 個線程同時做這件事,那麼就很有可能多個線程讀到的數據是相同的,這樣累加的結果也就是相同的了,就沒辦法保證 20 個線程每個人讀到的數據都是獨一無二的了。
怎麼樣才能讓 20 個線程讀到獨一無二的數值呢?很簡單,讓 讀取文件 --- 累加 1 --- 寫入文件 的這個動作同一時刻只能有一個線程來做,這樣每個線程讀取到的數值都是上一個線程寫入的數值了。那麼 讀取文件 --- 累加 1 --- 寫入文件 這段代碼(也就是發生競爭的這段區域)就叫做“臨界區”。
互斥量正如它的名字描述的一般,可以使各個線程實現互斥的效果。由它來保護臨界區每次只能由一個線程進入,當一個線程想要進入臨界區之前需要先搶鎖(加鎖),如果能搶到鎖就進入臨界區工作,並且要在離開的時候解鎖以便讓其它線程可以搶到鎖進入臨界區;如果沒有搶到鎖則進入阻塞狀態等待鎖被釋放然後再搶鎖。
要在進入臨界區之前加鎖,在退出臨界區的時候解鎖。
與 ptread_t 一樣,互斥量也使用一種數據類型來表示,它使用 pthread_mutex_t 類型來表示。
初始化互斥量有兩種方式:
1)用宏初始化:如同使用默認屬性;
2)使用 pthread_mutex_init(3) 函數初始化,可以為互斥量指定屬性。
pthread_mutex_t 使用完成之後需要使用 pthread_mutex_destroy(3) 函數銷毀,否則會導致內存洩漏。
一般什麼情況使用宏初始化,什麼情況使用函數初始化互斥量呢?請看下面的偽代碼:
1 /* 定義並賦值 */ 2 type name = value; // 定義並賦值。使用 PTHREAD_MUTEX_INITIALIZER 宏初始化互斥量必須在這種情況時。 3 4 /* 先定義再賦值 */ 5 type name; // 定義 6 name = valu; // 賦值。這種情況不允許使用 PTHREAD_MUTEX_INITIALIZER 宏初始化互斥量,只能使用 pthread_mutex_init(3) 函數初始化互斥量。
前面說了,要在進入臨界區之前加鎖,在退出臨界區的時候解鎖。我們來了解一下加鎖和解鎖的函數。
1 pthread_mutex_lock, pthread_mutex_trylock, pthread_mutex_unlock - lock and unlock a mutex 2 3 #include <pthread.h> 4 5 int pthread_mutex_lock(pthread_mutex_t *mutex); 6 int pthread_mutex_trylock(pthread_mutex_t *mutex); 7 int pthread_mutex_unlock(pthread_mutex_t *mutex);
_lock() 是阻塞加鎖,當搶鎖的時候被搶不到就死等,直到別人通過 _unlock() 把這把鎖解鎖再搶。
_trylock() 是嘗試加鎖,無論能否搶到鎖都返回。
臨界區是每個線程要單獨執行的,所以臨界區中的代碼執行時間越短越好。
了解了互斥量之後,我們再來看一道經典的面試題:用 4 個線程瘋狂的打印 abcd 持續 5 秒鐘,但是要按照順序打印,不能是亂序的。
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <pthread.h> 4 #include <string.h> 5 6 #define THRNUM 4 7 8 static pthread_mutex_t mut[THRNUM]; 9 10 static int next(int a) 11 { 12 if(a+1 == THRNUM) 13 return 0; 14 return a+1; 15 } 16 17 static void *thr_func(void *p) 18 { 19 int n = (int)p; 20 int ch = n + 'a'; 21 22 while(1) 23 { 24 pthread_mutex_lock(mut+n); 25 write(1,&ch,1); 26 pthread_mutex_unlock(mut+next(n)); 27 } 28 pthread_exit(NULL); 29 } 30 31 int main() 32 { 33 int i,err; 34 pthread_t tid[THRNUM]; 35 36 for(i = 0 ; i < THRNUM ; i++) 37 { 38 pthread_mutex_init(mut+i,NULL); 39 40 pthread_mutex_lock(mut+i); 41 42 err = pthread_create(tid+i,NULL,thr_func,(void *)i); 43 if(err) 44 { 45 fprintf(stderr,"pthread_create():%s\n",strerror(err)); 46 exit(1); 47 } 48 49 } 50 51 52 pthread_mutex_unlock(mut+0); 53 54 alarm(5); 55 56 57 for(i = 0 ; i < THRNUM ; i++) 58 pthread_join(tid[i],NULL); 59 60 61 exit(0); 62 }
上面這段代碼是通過多個互斥量實現了一個鎖鏈的結構巧妙的實現了要求的效果。
首先定義 4 個互斥量,然後創建 4 個線程,每個互斥量對應一個線程,每個線程負責打印一個字母。4 個線程剛剛被創建好時,4 把鎖都處於鎖定狀態,4 個線程全部都阻塞在臨界區之外,等 4 個線程全部都創建好之後解鎖其中一把鎖。被解鎖的線程首先將自己的互斥量上鎖,然後打印字符再解鎖下一個線程對應的互斥量,然後再次等待自己被解鎖。如此往復,使 4 個線程有條不紊的循環執行 鎖定自己 --- 打印字符 -- 解鎖下一個線程 的步驟,這樣打印到控制台上的 abcd 就是有序的了。
從上面的栗子可以看出來:互斥量限制的是一段代碼能否執行,而不是一個變量或一個資源。
上面的代碼雖然使用鎖鏈巧妙的完成了任務,但是它的實現方式並不是最漂亮的,更好的辦法我們下面介紹條件變量(pthread_cond_t)的時候會討論。
大家還記得我們在上一篇博文中提到過令牌桶嗎?當時只是實現了一個簡單的令牌桶,這次我們來寫一個通用的多線程並發版的令牌桶。
1 /* mytbf.h */ 2 #ifndef MYTBF_H__ 3 #define MYTBF_H__ 4 5 #define MYTBF_MAX 1024 6 7 typedef void mytbf_t; 8 9 mytbf_t *mytbf_init(int cps,int burst); 10 11 int mytbf_fetchtoken(mytbf_t *,int); 12 13 int mytbf_returntoken(mytbf_t *,int ); 14 15 void mytbf_destroy(mytbf_t *); 16 17 18 #endif
1 /* mytbf.c */ 2 #include <stdio.h> 3 #include <stdlib.h> 4 #include <unistd.h> 5 #include <sys/types.h> 6 #include <sys/stat.h> 7 #include <fcntl.h> 8 #include <errno.h> 9 #include <pthread.h> 10 #include <string.h> 11 12 #include "mytbf.h" 13 14 /* 每一個令牌桶 */ 15 struct mytbf_st 16 { 17 int cps; // 速率 18 int burst; // 令牌上限 19 int token; // 可用令牌數量 20 int pos; // 當前令牌桶在 job 數組中的下標 21 pthread_mutex_t mut; // 用來保護令牌競爭的互斥量 22 }; 23 24 /* 所有的令牌桶 */ 25 static struct mytbf_st *job[MYTBF_MAX]; 26 /* 用來保護令牌桶數組競爭的互斥量 */ 27 static pthread_mutex_t mut_job = PTHREAD_MUTEX_INITIALIZER; 28 /* 添加令牌的線程 ID */ 29 static pthread_t tid; 30 /* 初始化添加令牌的線程 */ 31 static pthread_once_t init_once = PTHREAD_ONCE_INIT; 32 33 /* 線程處理函數:負責定時向令牌桶中添加令牌 */ 34 static void *thr_alrm(void *p) 35 { 36 int i; 37 38 while(1) 39 { 40 pthread_mutex_lock(&mut_job); 41 // 遍歷所有的桶 42 for(i = 0 ; i < MYTBF_MAX; i++) 43 { 44 // 為可用的桶添加令牌 45 if(job[i] != NULL) 46 { 47 pthread_mutex_lock(&job[i]->mut); 48 job[i]->token += job[i]->cps; 49 // 桶中可用的令牌不能超過上限 50 if(job[i]->token > job[i]->burst) 51 job[i]->token = job[i]->burst; 52 pthread_mutex_unlock(&job[i]->mut); 53 } 54 } 55 pthread_mutex_unlock(&mut_job); 56 57 // 等待一秒鐘後繼續添加令牌 58 sleep(1); 59 } 60 61 pthread_exit(NULL); 62 } 63 64 static void module_unload(void) 65 { 66 int i; 67 68 pthread_cancel(tid); 69 pthread_join(tid,NULL); 70 71 pthread_mutex_lock(&mut_job); 72 for(i = 0 ; i < MYTBF_MAX ; i++) 73 { 74 if(job[i] != NULL) 75 { 76 // 互斥量使用完畢不要忘記釋放資源 77 pthread_mutex_destroy(&job[i]->mut); 78 free(job[i]); 79 } 80 } 81 82 pthread_mutex_unlock(&mut_job); 83 84 pthread_mutex_destroy(&mut_job); 85 86 } 87 88 static void module_load(void) 89 { 90 int err; 91 92 err = pthread_create(&tid,NULL,thr_alrm,NULL); 93 if(err) 94 { 95 fprintf(stderr,"pthread_create():%s\n",strerror(err)); 96 exit(1); 97 } 98 99 atexit(module_unload); 100 } 101 102 /* 103 * 為了不破壞調用者對令牌桶操作的原子性, 104 * 在該函數內加鎖可能會導致死鎖, 105 * 所以該函數內部無法加鎖, 106 * 必須在調用該函數之前先加鎖。 107 */ 108 static int get_free_pos_unlocked(void) 109 { 110 int i; 111 112 for(i = 0 ; i < MYTBF_MAX; i++) 113 if(job[i] == NULL) 114 return i; 115 return -1; 116 } 117 118 mytbf_t *mytbf_init(int cps,int burst) 119 { 120 struct mytbf_st *me; 121 int pos; 122 123 pthread_once(&init_once,module_load); 124 125 me = malloc(sizeof(*me)); 126 if(me == NULL) 127 return NULL; 128 129 me->cps = cps; 130 me->burst = burst; 131 me->token = 0; 132 pthread_mutex_init(&me->mut,NULL); 133 134 pthread_mutex_lock(&mut_job); 135 136 pos = get_free_pos_unlocked(); 137 if(pos < 0) 138 { 139 // 帶鎖跳轉不要忘記先解鎖再跳轉 140 pthread_mutex_unlock(&mut_job); 141 free(me); 142 return NULL; 143 } 144 145 me->pos = pos; 146 147 job[pos] = me; 148 149 pthread_mutex_unlock(&mut_job); 150 151 return me; 152 } 153 154 static inline int min(int a,int b) 155 { 156 return (a < b) ? a : b; 157 } 158 159 int mytbf_fetchtoken(mytbf_t *ptr,int size) 160 { 161 int n; 162 struct mytbf_st *me = ptr; 163 164 if(size < 0) 165 return -EINVAL; 166 167 pthread_mutex_lock(&me->mut); 168 // 令牌數量不足,等待令牌被添加進來 169 while(me->token <= 0) 170 { 171 // 先解鎖,出讓調度器讓別人先跑起來,然後再搶鎖檢查令牌是否夠用 172 pthread_mutex_unlock(&me->mut); 173 sched_yield(); 174 pthread_mutex_lock(&me->mut); 175 } 176 177 n = min(me->token,size); 178 179 me->token -= n; 180 181 pthread_mutex_unlock(&me->mut); 182 183 return n; 184 } 185 186 /* 令牌用不完要歸還喲,可不能浪費了 */ 187 int mytbf_returntoken(mytbf_t *ptr,int size) 188 { 189 struct mytbf_st *me = ptr; 190 191 // 逗我玩呢? 192 if(size < 0) 193 return -EINVAL; 194 195 pthread_mutex_lock(&me->mut); 196 197 me->token += size; 198 if(me->token > me->burst) 199 me->token = me->burst; 200 201 pthread_mutex_unlock(&me->mut); 202 203 return size; 204 } 205 206 void mytbf_destroy(mytbf_t *ptr) 207 { 208 struct mytbf_st *me = ptr; 209 210 pthread_mutex_lock(&mut_job); 211 job[me->pos] = NULL; 212 pthread_mutex_unlock(&mut_job); 213 214 pthread_mutex_destroy(&me->mut); 215 free(ptr); 216 }
上面這個令牌桶庫可以支持最多 1024 個桶,也就是可以使用多線程同時操作這 1024 個桶來獲得不同的速率,每個桶的速率是固定的。
這 1024 個桶保存在一個數組中,所以每次訪問桶的時候都需要對它進行加鎖,避免多個線程同時訪問發生競爭。
同樣每個桶也允許使用多個線程同時訪問,所以每個桶中也需要一個互斥量來保障處理令牌的時候不會發生競爭。
寫互斥量的代碼一定要注意臨界區內的所有的跳轉,通常在跳轉之前需要解鎖,避免產生死鎖。常見的跳轉包括 continue; break; return; goto; longjmp(3); 等等,甚至函數調用也是一種跳轉。
當某個函數內包含臨界區,也就是需要加鎖再進入臨界區,但是從程序的布局來看該函數無法加鎖,那麼根據 POSIX 標准的約定,這種函數的命名規則是必須以 _unlocked 作為後綴,所以大家在看到這樣的函數時在調用之前一定要先加鎖。總結起來說就是以這個後綴命名的函數表示函數內需要加鎖但是沒有加鎖,所以調用者需要先加鎖再調用,例如上面代碼中的 get_free_pos_unlocked() 函數。
LZ 來解釋一下上面這個令牌桶中用過的幾個沒見過的函數。
1 sched_yield — yield the processor 2 3 #include <sched.h> 4 5 int sched_yield(void);
sched_yield(2) 這個函數的作用是出讓調度器。在用戶態無法模擬它的實現,它會讓出當前線程所占用的調度器給其它線程使用,而不必等待時間片耗盡才切換調度器,大家暫時可以把它理解成一個很短暫的 sleep(3) 。一般用於在使用一個資源時需要同時獲得多把鎖但是卻沒法一次性獲得全部的鎖的場景下,只要有任何一把鎖沒有搶到,那麼就立即釋放已搶到的鎖,並讓出自己的調度器讓其它線程有機會獲得被自己釋放的鎖。當再次調度到自己時再重新搶鎖,直到能一次性搶到所有的鎖時再進入臨界區,這樣就避免了出現死鎖的情況。
1 pthread_once - dynamic package initialization 2 3 #include <pthread.h> 4 5 int pthread_once(pthread_once_t *once_control, 6 void (*init_routine)(void)); 7 pthread_once_t once_control = PTHREAD_ONCE_INIT;
pthread_once(3) 函數一般用於動態單次初始化,它能保證 init_routine 函數僅被調用一次。
pthread_once_t 只能使用 PTHREAD_ONCE_INIT 宏初始化,沒有提供其它初始化方式。這個與我們前面見到的初始化 pthread_t 和 pthread_nutex_t 不一樣。
上面的代碼中,向令牌桶添加令牌的線程只需要啟動一次,而初始化令牌桶的函數卻在開啟每個令牌桶的時候都需要調用。為了在初始化令牌桶的函數中僅啟動一次添加令牌的線程,采用 pthread_once(3) 函數來創建線程就可以了。這樣之後在第一次調用 mytbf_init() 函數的時候會啟動新線程添加令牌,而後續再調用 mytbf_init() 的時候就不會啟動添加令牌的線程了。
上面代碼中調用 pthread_once(3) 相當於下面的偽代碼:
1 lock(); 2 3 if (init_flag) 4 5 { 6 init_flag = 0; 7 // do sth 8 } 9 10 unlock();
10.條件變量(pthread_cond_t)
上面的程序經過測試,發現 CPU 正在滿負荷工作,說明程序中出現了忙等, 是哪裡出現了忙等呢?其實就是 mytbf_fetchtoken() 函數獲得鎖的時候采用了忙等的方式。前面我們提到過,異步程序有兩種處理方式,一種是通知法,一種是查詢法,我們這裡用的就是查詢法,下面我們把它修改成個通知法來實現。
1 /* mytbf.c */ 2 #include <stdio.h> 3 #include <stdlib.h> 4 #include <unistd.h> 5 #include <sys/types.h> 6 #include <sys/stat.h> 7 #include <fcntl.h> 8 #include <errno.h> 9 #include <pthread.h> 10 #include <string.h> 11 12 #include "mytbf.h" 13 14 /* 每一個令牌桶 */ 15 struct mytbf_st 16 { 17 int cps; // 速率 18 int burst; // 令牌上限 19 int token; // 可用令牌數量 20 int pos; // 當前令牌桶在 job 數組中的下標 21 pthread_mutex_t mut; // 用來保護令牌競爭的互斥量 22 pthread_cond_t cond; // 用於在令牌互斥量狀態改變時發送通知 23 }; 24 25 /* 所有的令牌桶 */ 26 static struct mytbf_st *job[MYTBF_MAX]; 27 /* 用來保護令牌桶數組競爭的互斥量 */ 28 static pthread_mutex_t mut_job = PTHREAD_MUTEX_INITIALIZER; 29 /* 添加令牌的線程 ID */ 30 static pthread_t tid; 31 /* 初始化添加令牌的線程 */ 32 static pthread_once_t init_once = PTHREAD_ONCE_INIT; 33 34 /* 線程處理函數:負責定時向令牌桶中添加令牌 */ 35 static void *thr_alrm(void *p) 36 { 37 int i; 38 39 while(1) 40 { 41 pthread_mutex_lock(&mut_job); 42 // 遍歷所有的桶 43 for(i = 0 ; i < MYTBF_MAX; i++) 44 { 45 // 為可用的桶添加令牌 46 if(job[i] != NULL) 47 { 48 pthread_mutex_lock(&job[i]->mut); 49 job[i]->token += job[i]->cps; 50 // 桶中可用的令牌不能超過上限 51 if(job[i]->token > job[i]->burst) 52 job[i]->token = job[i]->burst; 53 // 令牌添加完畢之後,通知所有等待使用令牌的線程准備搶鎖 54 pthread_cond_broadcast(&job[i]->cond); 55 pthread_mutex_unlock(&job[i]->mut); 56 } 57 } 58 pthread_mutex_unlock(&mut_job); 59 60 // 等待一秒鐘後繼續添加令牌 61 sleep(1); 62 } 63 64 pthread_exit(NULL); 65 } 66 67 static void module_unload(void) 68 { 69 int i; 70 71 pthread_cancel(tid); 72 pthread_join(tid,NULL); 73 74 pthread_mutex_lock(&mut_job); 75 for(i = 0 ; i < MYTBF_MAX ; i++) 76 { 77 if(job[i] != NULL) 78 { 79 // 互斥量和條件變量使用完之後不要忘記釋放資源 80 pthread_mutex_destroy(&job[i]->mut); 81 pthread_cond_destroy(&job[i]->cond); 82 free(job[i]); 83 } 84 } 85 86 pthread_mutex_unlock(&mut_job); 87 88 pthread_mutex_destroy(&mut_job); 89 90 } 91 92 static void module_load(void) 93 { 94 int err; 95 96 err = pthread_create(&tid,NULL,thr_alrm,NULL); 97 if(err) 98 { 99 fprintf(stderr,"pthread_create():%s\n",strerror(err)); 100 exit(1); 101 } 102 103 atexit(module_unload); 104 } 105 106 /* 107 * 為了不破壞調用者對令牌桶操作的原子性, 108 * 在該函數內加鎖可能會導致死鎖, 109 * 所以該函數內部無法加鎖, 110 * 必須在調用該函數之前先加鎖。 111 */ 112 static int get_free_pos_unlocked(void) 113 { 114 int i; 115 116 for(i = 0 ; i < MYTBF_MAX; i++) 117 if(job[i] == NULL) 118 return i; 119 return -1; 120 } 121 122 mytbf_t *mytbf_init(int cps,int burst) 123 { 124 struct mytbf_st *me; 125 int pos; 126 127 pthread_once(&init_once,module_load); 128 129 me = malloc(sizeof(*me)); 130 if(me == NULL) 131 return NULL; 132 133 me->cps = cps; 134 me->burst = burst; 135 me->token = 0; 136 pthread_mutex_init(&me->mut,NULL); 137 pthread_cond_init(&me->cond,NULL); 138 139 pthread_mutex_lock(&mut_job); 140 141 pos = get_free_pos_unlocked(); 142 if(pos < 0) 143 { 144 pthread_mutex_unlock(&mut_job); 145 free(me); 146 return NULL; 147 } 148 149 me->pos = pos; 150 151 job[pos] = me; 152 153 pthread_mutex_unlock(&mut_job); 154 155 return me; 156 } 157 158 static inline int min(int a,int b) 159 { 160 return (a < b) ? a : b; 161 } 162 163 int mytbf_fetchtoken(mytbf_t *ptr,int size) 164 { 165 int n; 166 struct mytbf_st *me = ptr; 167 168 if(size < 0) 169 return -EINVAL; 170 171 pthread_mutex_lock(&me->mut); 172 // 令牌數量不足,等待令牌被添加進來 173 while(me->token <= 0) 174 { 175 /* 176 * 原子化的解鎖、出讓調度器再搶鎖以便工作或等待 177 * 它會等待其它線程發送通知再喚醒 178 * 放在循環中是因為可能同時有多個線程再使用同一個桶, 179 * 被喚醒時未必就能拿得到令牌,所以要直到能拿到令牌再出去工作 180 */ 181 pthread_cond_wait(&me->cond,&me->mut); 182 183 // pthread_mutex_unlock(&me->mut); 184 // sched_yield(); 185 // pthread_mutex_lock(&me->mut); 186 } 187 188 n = min(me->token,size); 189 190 me->token -= n; 191 192 pthread_mutex_unlock(&me->mut); 193 194 return n; 195 } 196 197 /* 令牌用不完要歸還喲,可不能浪費了 */ 198 int mytbf_returntoken(mytbf_t *ptr,int size) 199 { 200 struct mytbf_st *me = ptr; 201 202 // 逗我玩呢? 203 if(size < 0) 204 return -EINVAL; 205 206 pthread_mutex_lock(&me->mut); 207 208 me->token += size; 209 if(me->token > me->burst) 210 me->token = me->burst; 211 212 /* 213 * 令牌歸還完畢,通知其它正在等待令牌的線程趕緊起床,准備搶鎖 214 * 這兩行誰在上面誰在後面都無所謂 215 * 如果先發通知再解鎖,收到通知的線程發現鎖沒有釋放會等待鎖釋放再搶; 216 * 如果先解鎖再發通知,反正已經出了臨界區了, 217 * 就算有線程在通知發出之前搶到了鎖也不會發生競爭, 218 * 大不了其它被喚醒的線程起床之後發現沒有鎖可以搶,那就繼續睡呗。 219 */ 220 pthread_cond_broadcast(&me->cond); 221 pthread_mutex_unlock(&me->mut); 222 223 return size; 224 } 225 226 void mytbf_destroy(mytbf_t *ptr) 227 { 228 struct mytbf_st *me = ptr; 229 230 pthread_mutex_lock(&mut_job); 231 job[me->pos] = NULL; 232 pthread_mutex_unlock(&mut_job); 233 234 pthread_mutex_destroy(&me->mut); 235 pthread_cond_destroy(&me->cond); 236 free(ptr); 237 }
大家不難看出這兩段代碼的差別,把查詢法(忙等)修改為通知法(非忙等)僅僅加一個條件變量(pthread_cond_t) 就行了。
條件變量的作用是什麼?其實就是讓線程以無競爭的形式等待某個條件的發生,當條件發生時通知等待的線程醒來去做某件事。
通知進程醒來有兩種方式,一種是僅通知一個線程醒來,如果有多個線程都在等待,那麼不一定是哪個線程被喚醒;另一種方式是把所有等待同一個條件的線程都喚醒。
在下面我們會介紹這兩種方式,先從條件變量的初始化和銷毀開始討論。
1 pthread_cond_destroy, pthread_cond_init - destroy and initialize condition variables 2 3 #include <pthread.h> 4 5 int pthread_cond_destroy(pthread_cond_t *cond); 6 int pthread_cond_init(pthread_cond_t *restrict cond, 7 const pthread_condattr_t *restrict attr); 8 pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
與互斥量一樣,條件變量也有兩種方式初始化,一種是使用 pthread_cond_init(3) 函數,另一種是使用 PTHREAD_COND_INITIALIZER 宏。這兩種方式的使用場景也與互斥量相同,這裡就不再贅述了。
條件變量在使用完畢之後不要忘記用 pthread_cond_destroy(3) 函數釋放資源,否則會導致內存洩漏!
1 pthread_cond_broadcast, pthread_cond_signal - broadcast or signal a condition 2 3 #include <pthread.h> 4 5 int pthread_cond_broadcast(pthread_cond_t *cond); 6 int pthread_cond_signal(pthread_cond_t *cond);
這兩個函數就是條件變量的關鍵操作了,大家注意看。
pthread_cond_signal(3) 函數用於喚醒當前多個等待的線程中的任何一個。雖然名字上有 signal,但是跟系統中的信號沒有任何關系。
pthread_cond_broadcast(3) 驚群,將現在正在等待的線程全部喚醒。
1 pthread_cond_timedwait, pthread_cond_wait - wait on a condition 2 3 #include <pthread.h> 4 5 int pthread_cond_timedwait(pthread_cond_t *restrict cond, 6 pthread_mutex_t *restrict mutex, 7 const struct timespec *restrict abstime); 8 int pthread_cond_wait(pthread_cond_t *restrict cond, 9 pthread_mutex_t *restrict mutex);
這幾個函數與上面的兩個函數的作用是成對的,上面的兩個函數用於喚醒線程,喚醒什麼線程呢?當然是喚醒 _wait() 等待條件滿足的線程啦。
當一個線程做某件事之前發現條件不滿足,那就使用這幾個 _wait() 函數進入等待狀態,當某個線程使條件滿足了就要用上面的兩個函數喚醒等待的線程繼續工作了。
pthread_cond_wait(3) 在臨界區外阻塞等待某一個條件發生變化,直到有一個通知到來打斷它的等待。這種方式是死等。
pthread_cond_timedwait(3) 增加了超時功能的等待,超時之後無論能否拿到鎖都返回。這種方式是嘗試等。
pthread_cond_wait(3) 相當於下面三行代碼的原子操作:
1 pthread_mutex_unlock(mutex); 2 3 sched_yield(); 4 5 pthread_mutex_lock(mutex);
通常等待會放在一個循環中,就像上面的令牌桶栗子一樣,因為可能有多個線程都在等待條件滿足,當前的線程被喚醒時不代表執行條件一定滿足,可能先被喚醒的線程發現條件滿足已經去工作了,等輪到當前線程調度的時候條件可能就又不滿足了,所以如果條件不滿足需要繼續進入等待。
還記得我們上面提到的面試題嗎?用鎖鏈實現的瘋狂的有序的打印 abcd 5 秒鐘。
上面我們說了,鎖鏈的辦法並不是這道題的考點,這道題真正的考點其實是使用互斥量 + 條件變量的方式來實現。
你想出來如何實現了嗎?自己先寫寫,寫完了再看 LZ 給出的實現。
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <pthread.h> 4 #include <string.h> 5 #include <unistd.h> 6 7 #define THRNUM 4 8 9 static pthread_mutex_t mut_num = PTHREAD_MUTEX_INITIALIZER; 10 static pthread_cond_t cond_num = PTHREAD_COND_INITIALIZER; 11 static int num = 0; 12 13 static int next(int a) 14 { 15 if(a+1 == THRNUM) 16 return 0; 17 return a+1; 18 } 19 20 static void *thr_func(void *p) 21 { 22 int n = (int)p; 23 int ch = n + 'a'; 24 25 while(1) 26 { 27 // 先搶鎖,能搶到鎖就可以獲得打印的機會 28 pthread_mutex_lock(&mut_num); 29 while(num != n) 30 { 31 // 搶到鎖但是發現不應該自己打印,那就釋放鎖再出讓調度器,讓別人嘗試搶鎖 32 pthread_cond_wait(&cond_num,&mut_num); 33 } 34 write(1,&ch,1); 35 num = next(num); 36 /* 37 * 自己打印完了,通知別人你們搶鎖吧 38 * 因為不知道下一個應該運行的線程是誰, 39 * 所以采用驚群的方式把它們全都喚醒, 40 * 讓它們自己檢查是不是該自己運行了。 41 */ 42 pthread_cond_broadcast(&cond_num); 43 pthread_mutex_unlock(&mut_num); 44 } 45 pthread_exit(NULL); 46 } 47 48 int main() 49 { 50 int i,err; 51 pthread_t tid[THRNUM]; 52 53 for(i = 0 ; i < THRNUM ; i++) 54 { 55 // 直接啟動 4 個線程,讓它們自己判斷自己是否應該運行,而不用提前鎖住它們 56 err = pthread_create(tid+i,NULL,thr_func,(void *)i); 57 if(err) 58 { 59 fprintf(stderr,"pthread_create():%s\n",strerror(err)); 60 exit(1); 61 } 62 63 } 64 65 66 alarm(5); 67 68 69 for(i = 0 ; i < THRNUM ; i++) 70 pthread_join(tid[i],NULL); 71 72 73 exit(0); 74 }
代碼中注釋寫得已經比較清晰了,所以 LZ 就不解釋了,小伙伴們要是有什麼看不懂的就在評論中留言討論。
那麼應該在什麼場景中選擇使用 pthread_cond_signal(3) 還是使用 pthread_cond_broadcast(3) 呢?
這個其實沒有固定的套路,要根據具體的場景來選擇。一般只有一個線程在等待或者明確知道哪個線程應該被喚醒的時候使用 _signal() 函數,如果有多個線程在等待並且不確定應該由誰起來工作的時候使用驚群。
LZ 說的不確定是指業務上不能確定哪個線程應該工作,而不是你作為程序猿稀裡糊塗的不知道哪個線程該工作。程序猿應該保證了解你的每一行代碼在做什麼,而不要寫出一坨自己都不知道它在做什麼的代碼。
這段應該補充一個栗子,但是 LZ 忘記案例是什麼了,所以先劃掉,等 LZ 想起來了再補充上來。
A -> B
1 5c
2 15c
3 5c
至於應該先發通知再解鎖還是先解鎖再發通知,效果上沒有太大的區別,這一點在上面令牌桶的栗子中已經闡述了。
11.下面純屬吐槽:
如果面試的時候問你:處理常規任務時,是采用多線程比較快還是采用多進程比較快?
如果只回答多線程比較快,那麼你工資一定多不了。
應該回答常規情況下是多線程較快,因為多進程需要重新布置進程的執行空間,還需要進行數據拷貝以及部分配置,所以會比創建線程慢xx倍。
不要只回答一個大方向就完事了,而是要量化你的答案,這樣才能體現出來你在平時學習工作中很注重這些細節問題。
吐槽完畢。。
12.一個進程最多能創建多少個線程
一個進程能夠創建多少個線程呢?主要受兩個因素影響,一個是 PID 耗盡,一個是我們在之前的博文中畫 C 程序地址空間布局時的陰影區域被棧空間占滿了 。(不記得那副圖了,去前面的博文裡找找。)
PID 看上去是進程 ID,但是在之前討論進程的博文中我們討論過,內核的最小執行單元其實是線程,實際上是線程在消耗 PID。一個系統中的線程可以有很多,所以 PID 被耗盡也是有可能的。
使用 ulimit(1) 命令可以查看棧空間的大小,陰影區剩余空間的大小 / 棧空間的大小 == 就是我們能創建的線程數量。
大家可以自己寫個程序測試一下一個進程最多能夠創建多少個線程,然後使用 ulimit(1) 命令修改棧的大小再測試幾次,看看能有什麼發現。代碼很簡單,LZ 就不貼出來了。
13.管道的特點
1)管道的同義詞是隊列;
2)管道是單工的;
3)管道必須湊齊讀寫雙方,如果只有一方,則阻塞等待。
關於管道的詳細內容,我們在後面討論進程間通信(IPC)的時候還會再詳細討論。