先補充個基礎知識――等待隊列
定義
wait_queue_head_t wait_queue;
初始化
init_waitqueue_head(&wait_queue);
等待
wait_event(queue, condition) 等待某個條件而進入睡眠
wait_event_interruptible(queue, condition) 等待某個條件而進入睡眠並允許信號中斷睡眠
wait_event_timeout(queue, condition,timeout) 等待某個條件而進入睡眠 最多等待timeout時間
wait_event_interruptible_timeout(queue, condition,timeout)
喚醒
void wake_up(wait_queue_head_t *queue); 喚醒阻塞在該等待隊列上的進程
void wake_up_interruptible(wait_queue_head_t *queue);
假設你的設備驅動程序在中斷中接收數據,為用戶空間提供讀取的操作。
你可以這樣處理:
1、為簡單說明,不考慮同步。
read()
{
If(len > 0) {
Read...
Return len;
}else {
Return 0;
}
}
Irq_handler()
{
Recv...
Add Len
}
這是一種非阻塞的實現
2、
Read()
{
If(wait_event_interruptible(wait_queue, len > 0)) {
Return error;
}
Read...
Return len;
}
Irq_handler()
{
recv
Add len
wake_up_interruptible(&wait_queue);
}
利用等待隊列實現的阻塞方式,無數據會把自己放到等待隊列中進入睡眠,當數據到來發生中斷時,在中斷中喚醒睡眠中等待隊列上的進程進行處理。當然阻塞其實是和睡眠無關的,這裡你無數據可以忙等,但睡眠是更優雅的方式。
wait_event
跟進wait_event(queue, condition)會發現他定義了一個wait_queue_t __wait {.private = current, .func = autoremove_wake_function, },然後將__wait放到了等待隊列queue中,即放到了queue的task_list鏈表中。
接下來設置當前進程的狀態為TASK_UNINTERRUPTIBLE,並調用schedule(),調度並切換到一個新的進程開始運行。
設置為TASK_UNINTERRUPTIBLE的進程,不會再被系統調度執行,會一直死在這裡。到此,該進程讓出了CPU不再執行,可以認為他進入了睡眠。
wake_up
跟進wake_up(queue),他其實遍歷queue的task_list鏈表,對每個結點(wait_queue_t類型),調用其func函數。
而此時queue裡面應該放著wait_event時放入的__wait,於是wake_up調用了__wait->func函數,__wait->func即autoremove_wake_function函數。
跟進autoremove_wake_function,發現函數裡面調用了try_to_wake_up,其參數就是__wait中賦予的current值,這樣就實現了在其他進程或中斷中,喚醒之前睡眠的進程。
try_to_wake_up中的處理比較復雜,不再繼續跟了,我們可以確定try_to_wake_up將之前睡眠的進程狀態設為TASK_RUNING,這樣之前的進程就可以繼續被調度執行了,即被喚醒了。
執行完try_to_wake_up後,將__wait從queue中刪除,wake_up的工作就完成了。
再次回到wait_event
之前我們知道,進程在調用schedule後就睡了,然後被其他進程或者中斷wake_up喚醒了,那麼進程喚醒後應該繼續在schedule後繼續執行。
繼續跟進,schedule返回後,會首先判斷條件condition是否成立,如果不成立,再次定義__wait,然後添加到等待隊列,schedule睡眠。如果成立,那麼wait_event執行完成,進程等待的條件滿足,可以繼續處理了。
wait_event_timeout
wait_event_timeout與wait_event的不同是wait_event調用的是schedule,而wait_event_timeout調用的是schedule_timeout。
schedule_timeout裡面又調用了schedule,但在調用之前,他定義了一個定時器,定時器在指定的timeout超時時,調用wake_up_process,進而調用try_to_wake_up喚醒進程。也就是說wait_event_timetou除了依賴於其他進程或中斷喚醒自己,本身還有個定時器可以喚醒自己。
我們知道select同時可以監視多個描述符,只要任一個有事件,就可以直接返回處理。如果都沒有事件則select睡眠等待,並且任一一個描述符有事件就可以喚醒select。其實現是基於等待隊列的。原理簡單的講就是每個描述符都對應一個等待隊列,每個描述符對應的驅動都提供一個poll方法。Select調用描述符的poll方法,檢查是否有事件,當沒有事件時,定義一個wait_queut_t的對象,放到描述符的等待隊列中。當select檢查到沒有事件進入睡眠後,任一個描述符有事件,執行喚醒等待隊列的操作就可以喚醒select。
Select的系統調用sys_select,在fs/select.c中(linux 2.6.27內核),其調用路徑為sys_select -> core_sys_select -> do_select。接下來我們看下slect系統調用的具體實現,代碼比較多,只撿重點的部分看,其他細節有時間再研究。
用戶空間在使用select時,會定義fd_set類型的變量,對應於不同的事件有readset、writeset、exceptset,其實他們都是unsigned long類型的數組,數組中的每一位標識一個fd,我們常用的FD_SET(fd, set),是將set中的數組的第fd位設為1。我們關心fd的那幾個事件,就將相應的set的第fd位置一,傳給內核,通知內核幫我監視,有情況告訴我。通過看內核對fd_set的定義,可以看出fd_set是一個1024位的數組,也就是最多支持1024個fd,如果需要支持更多的fd,需要修改代碼重新編譯內核了。
內核空間中,core_sys_select函數首先定義了一個long類型的數組,如果fd個數多,數組不夠,他會調用kmalloc,動態申請一個數組。數組的使用分為六塊,如下圖所示,每塊其實都是一個小的fd_set,只是fd_set是固定長度(1024位,注意是位不是字節)的數組,但這裡每塊的長度是和真實的fd的個數有關的。
接下來core_sys_select調用get_fd_set將用戶空間傳遞的readset、writeset、exceptset拷貝到in、out、ex中,然後調用do_select,將這個大數組傳給他。do_select通過in、out、ex裡面的位標識,得到要監視哪些fd,監視哪些事件(read、write、except),將監視的結果記錄到res_in、res_out、res_ex中。返回到core_sys_select,程序調用set_fd_set將res_in、res_out、res_ex中的結果,拷貝到用戶空間。select系統調用返回,就獲得事件處理了。
上一步提到了do_select,我們進一步研究研究他。
首先設置當前進程狀態 set_current_state(TASK_INTERRUPTIBLE);(這塊我還不是很了解,內核沒有搶占嗎,如果設置狀態後,切換出去了,豈不永遠都切不回來了,一是此時還沒添加喚醒的處理,不會有其他進程喚醒他,二是CPU不會調度TASK_INTERRUPTIBLE狀態的進程執行。那麼這裡是沒有內核搶占還是設置了TASK_INTERRUPTILBE的進程不會沒搶占?)
然後循環掃描的in、out、ex中的信息(哪些fd關心read事件、哪些fd關心write事件、哪些fd關心except事件),調用具體的fd的驅動相關的poll函數獲取fd的事件的狀態,根據返回的狀態,將結果設置到res_in、res_out、res_ex。其實很簡單,如果in中的第n位為一,標識fd=n的描述符關心read事件,在調用fd=n對應的驅動的poll之後,如果有read事件,則將res_in中的n位置一。
(cond_resched這個函數是做什麼的?)
在處理完一輪後(處理完了in、out、ex中的請求),如果fd請求的事件發生了,則返回,如果都沒有發生則調用schedule_timeout,進入睡眠,等待事件到來時被喚醒。
好,我們看看,do_select是怎樣在有事件時被喚醒的。在這之前,我們先想想如果我們自己來做,如何利用等待隊列實現。大體思路,我們應該定義一個等待隊列wait_queue_head_t queue,select在沒有事件時,定義一個wait_queue_t的對象wait放到queue中,然後調度schedule進入睡眠。在驅動中,當事件到來時,遍歷等待在queue的wait並喚醒。其實內核實現就是這個思路,支持阻塞IO的驅動實現中,通常會定義三個等待隊列,對應於read、write、except,select調用到poll中時,如果沒有事件,會定義一個wait_queue_t的wait放到等待隊列中,當驅動檢查到事件發生時,會喚醒睡在等待隊列上的進程。
接下來看看select在睡之前做了哪些准備工作,怎樣將wait加入到等待隊列中的。
先了解一下do_select中使用的一個數據結構
struct poll_wqueues {
poll_table pt;
struct poll_table_page *table;
struct task_struct *polling_task;
int triggered;
int error;
int inline_index;
struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
}
do_select聲明了這樣類型的一個對象table,然後初始化其成員polling_task = current, pt->qproc = __pollwait。
接下來在調用各fd對應的驅動的poll時,將table.pt(poll_table類型)作為參數傳入。
我們知道各個驅動模塊實現的各自的poll函數中,如果自己沒有read、write、except事件,會調用poll_wait函數,參數wait_address是驅動中聲明的等待隊列,p是調用poll時傳入的table.pt。以下是poll_wait的實現:
static inline void poll_wait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
{
p->qproc(file, wait_address, p);
}
可以看到poll_wait中調用的p->qproc就是之前初始化poll_wqueue時,指定的__pollwait函數。
static void __pollwait(struct file *filp,wait_queue_head_t *wait_address, poll_table *p);
struct poll_table_entry {
struct file *filp;
unsigned long key;
wait_queue_t wait;
wait_queue_head_t *wait_address;
}
__pollwait中首先獲取一個poll_table_entry類型的變量entry,獲取其實是在poll_wqueue的inline_entries中拿的。然後初始化entry,entry->file = file;entry->key = p->key;entry->wait.func = pollwake,最後將entry->wait添加到等待隊列wait_address中。
所有的准備工作做好了,如果沒有事件產生,do_select調度schedule進入了睡眠。
喚醒一般在中斷或者軟中斷中處理的,一般在檢查到事件到來時,驅動中會調用wake_up函數,參數為驅動中定義的等待隊列。
追蹤wake_up函數,最終調用了__wake_up_common,在這個函數中,遍歷wait_queue_head_t中的結點,每個結點是wait_queue_t類型,調用每個結點的func指針指向的函數。前面我們知道func指針指向了pollwake,pollwake最終通過調用try_wake_up喚醒了進程。
pollwake->__pollwake->default_wake_function->try_to_wake_up
wait_queue_t中記錄了要被喚醒的進程的task_struct結構,因此通過以上系列調用,最終實現了睡眠進程的喚醒。
poll與select的流程基本一致,其調用路徑為sys_poll->do_sys_poll->do_poll->do_pollfd
do_sys_poll將用戶空間的pollfd拷貝到內核空間,初始化poll_wqueues table對象,其使用與select相同。調用do_poll,取得需監視的fd的狀態,然後將狀態拷貝到用戶空間,返回。
do_poll與do_select類似,查詢事件,沒事件睡眠。只是do_poll中使用pollfd,do_select使用long類型中的每一位記錄狀態。
do_pollfd實現對poll的調用,然後將狀態記錄到pollfd中。
我們看看select與poll的不同
select使用fd_set記錄要檢查的描述符,該結構本身是1024位,也就限制了最多只能檢測1024個描述符。
poll使用pollfd結構的數組,檢測多少個描述符,就傳遞多大的數組就可以了。
struct pollfd {
int fd;
short events;
short revents;
};
select使用的fd_set記錄輸入輸出,每次返回後,返回的結果就把系統調用時傳入的信息給覆蓋掉了,因此每次調用select都需要給fd_set賦值。
poll使用pollfd結構,events記錄要檢測的事件,revents記錄結果,pollfd初始化一次就可以了,以後每次poll調用不需要重新初始化pollfd。
不知不覺寫這麼多了,epoll的探究再開一片吧。
由於也是邊查資料邊看代碼邊整理,是一個學習的過程,思路有點跳躍不連貫,歡迎拍磚,接下來我會再次整理,屢屢思路。
linux網絡編程的I/O 多路復用。select()函數是系統提供的,它可以在多個描
述符中選擇被激活的描述符進行操作。
例如:一個進程中有多個客戶連接,即存在多個TCP 套接字描述符。select()函數阻塞
直到任何一個描述符被激活,即有數據傳輸。從而避免了進程為等待一個已連接上的數據而
無法處理其他連接。因而,這是一個時分復用的方法,從用戶角度而言,它實現了一個進程
或線程中的並發處理。
I/O 多路復用技術的最大優勢是系統開銷小,系統不必創建進程、線程,也不必維護這
些進程/線程,從而大大減少了系統的開銷。
select()函數用於實現I/O 多路復用,它允許進程指示系統內核等待多個事件中的任何一
個發生,並僅在一個或多個事情發送或經過某指定的時間後才喚醒進程。
它的原型如下,
#include<sys/time.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set * errorfds, struct timeval *timeout);
ndfs: select() 函數監視描述符數的最大值。根據進程中打開的描述符數而定,一般設為要
監視的描述符的最大數加1。
readfds: select() 函數監視的可讀描述符集合。
writefds: select()函數監視的可寫描述符集合。
errorfds: select()函數監視的異常描述符集合。
timeout: select()函數超時結束時間
返回值。如果成功返回總的位數,這些位對應已准備好的描述符。否則返回-1,並在errno
中設置相應的錯誤碼。
FD_ZERO(fd_set *fdset):清空fdset 與所有描述符的聯系
FD_SET(int fd, fd_set *fdset):建立描述符fd 與fdset 的聯系
FD_CLR(int fd, fd_set *fdset):撤銷描述符fd 與fdset 的聯系
FD_ISSET(int fd,fd_set *fdset) ::檢查與fdset 聯系的描述符fd 是否可讀寫,返回非0表示可讀寫。
采用select()函數實現I/O 多路復用的基本步驟如下:
(1) 清空描述符集合
(2) 建立需要監視的描述符與描述符集合的聯系
(3) 調用select()函數
(4) 檢查所有需要監視的描述符,利用FD_ISSET 判斷是否准備好
(5) 對已准備好的描述符進行I/O 操作
如果是64位的話,一次可以查看1024個。 補助一點,這是可以變動的。select可以通過技術手段改變閘限值,另外poll 函數沒有限制吧