poll調用和select調用實現的功能一樣,都是網絡IO利用的一種機制。先看一下poll的調用形式
一,poll調用
[cpp]
#include <poll.h>
int poll(struct pollfd fds[], nfds_t nfds, int timeout);
struct pollfd結構如下:【在源碼文件poll.h文件中】
[cpp]
struct pollfd {
int fd;
short events;
short revents;
};
這個結構中fd表示文件描述符,events表示請求檢測的事件,revents表示檢測之後返回的事件,如果當某個文件描述符有狀態變化時,revents的值就不為空。
二,參數說明
fds:存放需要被檢測狀態的Socket描述符;與select不同(select函數在調用之後,會清空檢測socket描述符的數組),每當調用這個函數之後,系統不會清空這個數組,而是將有狀態變化的描述符結構的revents變量狀態變化,操作起來比較方便;
nfds:用於標記數組fds中的struct pollfd結構元素的總數量;
timeout:poll函數調用阻塞的時間,單位是MS(毫秒)
三,返回值
大於0:表示數組fds中有socket描述符的狀態發生變化,或可以讀取、或可以寫入、或出錯。並且返回的值表示這些狀態有變化的socket描述符的總數量;此時可以對fds數組進行遍歷,以尋找那些revents不空的socket描述符,然後判斷這個裡面有哪些事件以讀取數據。
等於0:表示沒有socket描述符有狀態變化,並且調用超時。
小於0:此時表示有錯誤發生,此時全局變量errno保存錯誤碼。
四,內核實現
poll系統調用的內核實現是sys_poll,其代碼如下:
[cpp]
asmlinkage long sys_poll(struct pollfd __user *ufds, unsigned int nfds,
long timeout_msecs)
{
s64 timeout_jiffies;
int ret;
if (timeout_msecs > 0) {
#if HZ > 1000
/* We can only overflow if HZ > 1000 */
if (timeout_msecs / 1000 > (s64)0x7fffffffffffffffULL / (s64)HZ)
timeout_jiffies = -1;
else
#endif
timeout_jiffies = msecs_to_jiffies(timeout_msecs);
} else {
/* Infinite (< 0) or no (0) timeout */
timeout_jiffies = timeout_msecs;
}
ret = do_sys_poll(ufds, nfds, &timeout_jiffies);
if (ret == -EINTR) {
struct restart_block *restart_block;
restart_block = ¤t_thread_info()->restart_block;
restart_block->fn = do_restart_poll;
restart_block->arg0 = (unsigned long)ufds;
restart_block->arg1 = nfds;
restart_block->arg2 = timeout_jiffies & 0xFFFFFFFF;
restart_block->arg3 = (u64)timeout_jiffies >> 32;
ret = -ERESTART_RESTARTBLOCK;
}
return ret;
}
這個函數還是比較容易理解,包括三個部分的工作:
函數調用超時阻塞時間轉換,根據內核的軟時鐘設置頻率將超時時間設置為jiffies標准時間。
調用do_sys_poll,這裡完成主要的工作。
如果當前進程有待處理的信號,則先處理信號,這是根據do_sys_poll返回來決定的,事實上在這個調用中會檢查當前的進程是否有未處理信號,如果有,就會返回EINTR以處理信號,然後返回-ERESTART_RESTARTBLOCK,這會導致重新調用。
進入到do_sys_poll函數中
[cpp]
int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds, s64 *timeout)
{
struct poll_wqueues table;
int err = -EFAULT, fdcount, len, size;
/* Allocate small arguments on the stack to save memory and be
faster - use long to make sure the buffer is aligned properly
on 64 bit archs to avoid unaligned access */
long stack_pps[POLL_STACK_ALLOC/sizeof(long)];
struct poll_list *const head = (struct poll_list *)stack_pps;
struct poll_list *walk = head;
unsigned long todo = nfds;
if (nfds > current->signal->rlim[RLIMIT_NOFILE].rlim_cur)
return -EINVAL;
len = min_t(unsigned int, nfds, N_STACK_PPS);
for (;;) {
walk->next = NULL;
walk->len = len;
if (!len)
break;
if (copy_from_user(walk->entries, ufds + nfds-todo,
sizeof(struct pollfd) * walk->len))
goto out_fds;
todo -= walk->len;
if (!todo)
break;
len = min(todo, POLLFD_PER_PAGE);
size = sizeof(struct poll_list) + sizeof(struct pollfd) * len;
walk = walk->next = kmalloc(size, GFP_KERNEL);
if (!walk) {
err = -ENOMEM;
goto out_fds;
}
}
pollfd
poll_initwait(&table);
fdcount = do_poll(nfds, head, &table, timeout);
poll_freewait(&table);
for (walk = head; walk; walk = walk->next) {
struct pollfd *fds = walk->entries;
int j;
for (j = 0; j < walk->len; j++, ufds++)
if (__put_user(fds[j].revents, &ufds->revents))
goto out_fds;
}
err = fdcount;
out_fds:
walk = head->next;
while (walk) {
struct poll_list *pos = walk;
walk = walk->next;
kfree(pos);
}
return err;
}
為了加快處理速度和提高系統性能,這裡優先使用已經定好的一個棧空間,其大小為POLL_STACK_ALLOC,在我系統上,其值為256,大小為256個字節的棧空間轉換為struct poll_list結構,以存儲需要被檢測的socket描述符,struct poll_list的結構如下:
[cpp]
struct poll_list {
struct poll_list *next;
int len;
struct pollfd entries[0];
};
上面可以看到該結構的entries為一個數組,結構為struct pollfd,這個有點眼熟,沒錯,它就是存儲poll調用中需要被檢測的socket描述符。那麼前面分配的棧空間能存儲多少個struct pollfd呢?這計算如下:
[cpp]
len = min_t(unsigned int, nfds, N_STACK_PPS);
式中的N_STACK_PPS就是計算前面默認的固定棧大小能夠存儲多少個struct pollfd的
[cpp] view plaincopy
#define N_STACK_PPS ((sizeof(stack_pps) - sizeof(struct poll_list)) / \
sizeof(struct pollfd))
然後就復制len個struct pollfd至內核空間,這裡有細心的用戶就會發現:如果nfds比N_STACK_PPS大的話,怎麼辦呢?注意上面的函數,是一個循環,如果nfds比N_STACK_PPS大(事實上,一般都會比這裡大),那麼會再請求內存,然後接著復制,就是這個代碼片段:
[cpp]
len = min(todo, POLLFD_PER_PAGE);
size = sizeof(struct poll_list) + sizeof(struct pollfd) * len;
walk = walk->next = kmalloc(size, GFP_KERNEL);
if (!walk) {
err = -ENOMEM;
goto out_fds;
}
POLLFD_PER_PAGE表示一頁的內存能夠存儲多少個struct pollfd,可以計算一下,一頁是4K,而struct pollfd的內存占用8個字節,就是一頁的內存可以將近存儲512個socket描述符。如果在分配一頁的內存之後,還不夠nfds來用,沒關系,循環不會退出的,會再分配一個頁,並且所有分配的塊都被struct poll_list鏈接起來,上面可以看到,這個結構有一個next域,就是專門做這個的。
在這之後,就會形成一個以stack_pps存儲空間為頭,然後一頁一頁分配的內存為接點的鏈表,這個鏈表上就存儲了poll調用時傳入的所有的socket描述符。
接下來調用一個很重要的部分
[cpp]
poll_initwait(&table);
fdcount = do_poll(nfds, head, &table, timeout);
poll_freewait(&table);
這是最重要的部分,因為接下來的部分比較容易理解,在這之後,做兩件事:
將鏈表上的所有struct pollfd中的revents的狀態寫入到用戶空間(記得之前也從用戶空間寫入過內核空間,這是因為內核態地址,用戶空間應用不能訪問),所以需要寫入到用戶空間中去。
之前調用kmalloc分配了很多內存,現在要釋放了,所以要從stack_pps地址處的head開始,順著next不斷的釋放內存。
再回到最重要的部分,先看poll_initwait調用,下面是主要相關的數據結構
[cpp]
struct poll_wqueues {
poll_table pt;
struct poll_table_page * table;
int error;
int inline_index;
struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};
typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);
typedef struct poll_table_struct {
poll_queue_proc qproc;
} poll_table;
poll_initwait函數如下:
[cpp]
void poll_initwait(struct poll_wqueues *pwq)
{
init_poll_funcptr(&pwq->pt, __pollwait);//設置poll_table結構中的qproc函數指針為__pollwait函數,就是pwq->pt->qproc=__pollwait。這個函數是一個回調函數,基本上這種機制的實現,就是依靠回調函數了。
pwq->error = 0;
pwq->table = NULL;
pwq->inline_index = 0;
}
所以poll_initwait就是初始化了poll_wqueues table,主要是將其結構中的函數指針設置為__pollwait函數。那麼這個函數是做什麼的呢?我們先看poll_initwait之後調用的函數,就是do_poll函數,其實現如下:
注意下面函數在調用時的參數,參數有這麼幾個nfds, head, &table, timeout,參數就容易理解了:nfds表示poll調用時傳入的數組中struct pollfd的個數,head其實是表示將poll調用時傳入的數組,因為全部都表示為struct poll_list鏈表了(前面分析的,還記得吧),table是剛剛初始化的一個,裡面暫時就只是包含一個回調函數的指針,就是__pollwait函數。timeout表示超時時間。
[cpp]
static int do_poll(unsigned int nfds, struct poll_list *list,
struct poll_wqueues *wait, s64 *timeout)
{
int count = 0;
poll_table* pt = &wait->pt;
/* Optimise the no-wait case */
if (!(*timeout))
pt = NULL;
for (;;) {
struct poll_list *walk;
long __timeout;
set_current_state(TASK_INTERRUPTIBLE);
for (walk = list; walk != NULL; walk = walk->next) {
struct pollfd * pfd, * pfd_end;
pfd = walk->entries;
pfd_end = pfd + walk->len;
for (; pfd != pfd_end; pfd++) {
/*
* Fish for events. If we found one, record it
* and kill the poll_table, so we don't
* needlessly register any other waiters after
* this. They'll get immediately deregistered
* when we break out and return.
*/
if (do_pollfd(pfd, pt)) {
count++;
pt = NULL;
}
}
}
/*
* All waiters have already been registered, so don't provide
* a poll_table to them on the next loop iteration.
*/
pt = NULL;
if (!count) {
count = wait->error;
if (signal_pending(current))
count = -EINTR;
}
if (count || !*timeout)
break;
if (*timeout < 0) {
/* Wait indefinitely */
__timeout = MAX_SCHEDULE_TIMEOUT;
} else if (unlikely(*timeout >= (s64)MAX_SCHEDULE_TIMEOUT-1)) {
/*
* Wait for longer than MAX_SCHEDULE_TIMEOUT. Do it in
* a loop
*/
__timeout = MAX_SCHEDULE_TIMEOUT - 1;
*timeout -= __timeout;
} else {
__timeout = *timeout;
*timeout = 0;
}
__timeout = schedule_timeout(__timeout);
if (*timeout >= 0)
*timeout += __timeout;
}
__set_current_state(TASK_RUNNING);
return count;
}
這個函數有以下幾個要注意的點:
信號處理保障。在這個函數中先將當前進程設置為可以被信號中斷,就是set_current_state(TASK_INTERRUPTIBLE)這一行,後面還會檢查是否有需要處理的信號signal_pending(current)。這裡的意思是就算是poll調用進入到sys_poll系統調用之後,也可以接收外部信號,從而退出當前系統調用(因為我們知道一般的系統調用都不會被中斷的,所以系統調用一般都盡量很快的返回)。
外部大循環退出的條件,外部大循環退出的條件只有if (count || !*timeout) break;後面的條件容易理解,就是超時,前面的count是什麼意思?它在每次調用do_pollfd函數之後,都有可能會加1,其實調用do_pollfd就是檢查socket描述符狀態的變化,如果有變化,就會使count加1,所以在結束內部遍歷之後,count保存了所有的有狀態變化的socket描述符數量。
這個函數會對之前以head為頭結點的鏈表進行遍歷,然後鏈表上每個結點中都包含很多很多的struct pollfd進行遍歷(這些struct pollfd都被存儲在struct poll_list結構的數組字段struct pollfd entries裡面。
然後對每個struct pollfd調用do_pollfd(這會調用很多次,根據你傳入多少個socket描述符而定),這個函數需要兩個參數,一個是struct pollfd,這沒得說的,另一個是剛剛初始化的table,就是那個暫時只是包含__pollwait回調指針的結構,還記得吧。
我們再進入do_pollfd,了解這個函數是做什麼的?
[cpp]
static inline unsigned int do_pollfd(struct pollfd *pollfd, poll_table *pwait)
{
unsigned int mask;
int fd;
mask = 0;
fd = pollfd->fd;
if (fd >= 0) {
int fput_needed;
struct file * file;
file = fget_light(fd, &fput_needed);
mask = POLLNVAL;
if (file != NULL) {
mask = DEFAULT_POLLMASK;
if (file->f_op && file->f_op->poll)
mask = file->f_op->poll(file, pwait);
/* Mask out unneeded events. */
mask &= pollfd->events | POLLERR | POLLHUP;
fput_light(file, fput_needed);
}
}
pollfd->revents = mask;
return mask;
}
這個函數很簡單,先根據socket描述符或者是文件句柄找到進程對應的struct file *file結構,然後調用file->f_op->poll(file,pwait),這是這個函數的核心調用,這其實也是linux的VFS的一部分,這會根據當前的文件是什麼類型的文件來選擇調用的入口,如file是socket網絡文件,此時調用的就是由網絡驅動設備來實現的poll,如果file是ext3等文件系統上打開的一個文件,那就會調用由該文件系統來實現的poll函數,我們以tcp_poll為例來了解一般poll完成什麼工作;
注意下面的參數,file和wait是由file->f_op->poll調用傳入的參數,而struct socket為socket連接的進程方面表示。
[cpp]
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
unsigned int mask;
struct sock *sk = sock->sk;
struct tcp_sock *tp = tcp_sk(sk);
poll_wait(file, sk->sk_sleep, wait);
if (sk->sk_state == TCP_LISTEN)
return inet_csk_listen_poll(sk);
/* Socket is not locked. We are protected from async events
by poll logic and correct handling of state changes
made by another threads is impossible in any case.
*/
mask = 0;
if (sk->sk_err)
mask = POLLERR;
/*
* POLLHUP is certainly not done right. But poll() doesn't
* have a notion of HUP in just one direction, and for a
* socket the read side is more interesting.
*
* Some poll() documentation says that POLLHUP is incompatible
* with the POLLOUT/POLLWR flags, so somebody should check this
* all. But careful, it tends to be safer to return too many
* bits than too few, and you can easily break real applications
* if you don't tell them that something has hung up!
*
* Check-me.
*
* Check number 1. POLLHUP is _UNMASKABLE_ event (see UNIX98 and
* our fs/select.c). It means that after we received EOF,
* poll always returns immediately, making impossible poll() on write()
* in state CLOSE_WAIT. One solution is evident --- to set POLLHUP
* if and only if shutdown has been made in both directions.
* Actually, it is interesting to look how Solaris and DUX
* solve this dilemma. I would prefer, if PULLHUP were maskable,
* then we could set it on SND_SHUTDOWN. BTW examples given
* in Stevens' books assume exactly this behaviour, it explains
* why PULLHUP is incompatible with POLLOUT. --ANK
*
* NOTE. Check for TCP_CLOSE is added. The goal is to prevent
* blocking on fresh not-connected or disconnected socket. --ANK
*/
if (sk->sk_shutdown == SHUTDOWN_MASK || sk->sk_state == TCP_CLOSE)
mask |= POLLHUP;
if (sk->sk_shutdown & RCV_SHUTDOWN)
mask |= POLLIN | POLLRDNORM | POLLRDHUP;
/* Connected? */
if ((1 << sk->sk_state) & ~(TCPF_SYN_SENT | TCPF_SYN_RECV)) {
/* Potential race condition. If read of tp below will
* escape above sk->sk_state, we can be illegally awaken
* in SYN_* states. */
if ((tp->rcv_nxt != tp->copied_seq) &&
(tp->urg_seq != tp->copied_seq ||
tp->rcv_nxt != tp->copied_seq + 1 ||
sock_flag(sk, SOCK_URGINLINE) || !tp->urg_data))
mask |= POLLIN | POLLRDNORM;
if (!(sk->sk_shutdown & SEND_SHUTDOWN)) {
if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
mask |= POLLOUT | POLLWRNORM;
} else { /* send SIGIO later */
set_bit(SOCK_ASYNC_NOSPACE,
&sk->sk_socket->flags);
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
/* Race breaker. If space is freed after
* wspace test but before the flags are set,
* IO signal will be lost.
*/
if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
mask |= POLLOUT | POLLWRNORM;
}
}
if (tp->urg_data & TCP_URG_VALID)
mask |= POLLPRI;
}
return mask;
}
上面的tcp_poll看上去很長,但核心的的調用是:
[cpp]
poll_wait(file, sk->sk_sleep, wait);
這個函數的file和wait是我們在poll調用過程中傳入的參數,sk->sk_sleep是什麼呢?這裡解釋一下
sk的值是
[cpp]
struct sock *sk = sock->sk;
struct sock是socket連接的內核表示,sk->sk_sleep是struct wait_queue_head_t結構類型,這表示的是socket的等待隊列,每一個socket都有自己的一個等待隊列,由內核結構struct sock來維護。
其實大多數驅動實現的時候,此時都調用這個函數,這個函數也很簡單,實現如下:
[cpp]
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && wait_address)
p->qproc(filp, wait_address, p);
}
現在一個轉折點出現了,前面我們說過初始化table的函數指針為__pollwait,那麼此時調用的就是__pollwait(filp,wait_address,p),這裡的參數分別表示為進程表示文件結構struct file,socket或設備的等待隊列wait_queue_head_t,和poll_table。
再回顧一下,到此為止,從我們調用poll函數開始,然後復制數據至內核、將struct pollfd表示為內核的struct poll_list鏈表、初始化poll_table變量、然後調用do_pollfd函數等過程,其實都是為了檢查poll傳遞的每個struct pollfd是否有狀態變化,也就是調用VFS的file->f_op->poll函數,這就到了__pollwait函數這裡來了,這個函數會往等待隊列上添加一個新的結點。
__pollwait的實現
[cpp]
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
poll_table *p)
{
struct poll_table_entry *entry = poll_get_entry(p);
if (!entry)
return;
get_file(filp);
entry->filp = filp;
entry->wait_address = wait_address;
init_waitqueue_entry(&entry->wait, current);
add_wait_queue(wait_address, &entry->wait);
}
我們現在來分析一下,__pollwait調用完成之後,內核做了什麼?先看一下poll_get_entry(p);
[cpp]
static struct poll_table_entry *poll_get_entry(poll_table *_p)
{
struct poll_wqueues *p = container_of(_p, struct poll_wqueues, pt);
struct poll_table_page *table = p->table;
if (p->inline_index < N_INLINE_POLL_ENTRIES)
return p->inline_entries + p->inline_index++;
if (!table || POLL_TABLE_FULL(table)) {
struct poll_table_page *new_table;
new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);
if (!new_table) {
p->error = -ENOMEM;
__set_current_state(TASK_RUNNING);
return NULL;
}
new_table->entry = new_table->entries;
new_table->next = table;
p->table = new_table;
table = new_table;
}
return table->entry++;
}
這個函數會根據情況創建struct poll_table_page結構,因為__pollwait在系統中是會被多次調用的,所以可能會有多個struct poll_table_page結構,這個結構是對struct poll_table_entry的一個封裝,其結構如下所示:
[cpp]
struct poll_table_page {
struct poll_table_page * next;
struct poll_table_entry * entry;
struct poll_table_entry entries[0];
};
struct poll_table_entry {
struct file * filp;
wait_queue_t wait;
wait_queue_head_t * wait_address;
};
所以在調用poll_get_entry之後,會返回一個新的poll_table_entry,這也是每次調用__pollwait都會產生的。接下來調用init_waitqueue_entry函數將這個新建的struct poll_table_entry和當前的進程綁定起來,再將struct poll_table_entry加入到socket的等待隊列。這樣就將當前進程和socket的等待隊列聯系,說白了,就是把current掛到等待隊列上。
因為一旦有數據就緒,就會叫醒等待隊列上的進程。可以看代碼
[cpp]
static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)
{
q->flags = 0;
q->private = p;
q->func = default_wake_function;
}
這裡同時,注冊了一個數據就緒時的叫醒函數
[cpp]
int default_wake_function(wait_queue_t *curr, unsigned mode, int sync,
void *key)
{
return try_to_wake_up(curr->private, mode, sync);
}
這就完成了調用。再來所有回顧一下
調用poll函數。
進入sys_poll等系列內核調用。
准備數據:,注冊__pollwait(這是通過初始化poll_wqueues來完成的),復制數據至內核,重新組織成struct poll_list等等。
對所有的struct pollfd循環,以調用do_pollfd函數。
do_pollfd調用file->f_op->poll函數。
然後調用__pollwait創建一個struct poll_table_entry,並將其與當前進程綁定。
將當前進程掛在socket的等待隊列上。
有數據就緒時喚醒進程。