程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> 關於C語言 >> 【轉】linux expoll模型,linuxexpoll模型

【轉】linux expoll模型,linuxexpoll模型

編輯:關於C語言

【轉】linux expoll模型,linuxexpoll模型


原文地址:http://www.cnblogs.com/venow/archive/2012/11/30/2790031.html

定義:

  epoll是Linux內核為處理大批句柄而作改進的poll,是Linux下多路復用IO接口select/poll的增強版本,它能顯著的減少程序在大量並發連接中只有少量活躍的情況下的系統CPU利用率。因為它會復用文件描述符集合來傳遞結果而不是迫使開發者每次等待事件之前都必須重新准備要被偵聽的文件描述符集合,另一個原因就是獲取事件的時候,它無須遍歷整個被偵聽的描述符集,只要遍歷那些被內核IO事件異步喚醒而加入Ready隊列的描述符集合就行了。epoll除了提供select\poll那種IO事件的電平觸發(Level Triggered)外,還提供了邊沿觸發(Edge Triggered),這就使得用戶空間程序有可能緩存IO狀態,減少epoll_wait/epoll_pwait的調用,提供應用程序的效率。

工作方式:

  LT(level triggered):水平觸發,缺省方式,同時支持block和no-block socket,在這種做法中,內核告訴我們一個文件描述符是否被就緒了,如果就緒了,你就可以對這個就緒的fd進行IO操作。如果你不作任何操作,內核還是會繼續通知你的,所以,這種模式編程出錯的可能性較小。傳統的select\poll都是這種模型的代表。

  ET(edge-triggered):邊沿觸發,高速工作方式,只支持no-block socket。在這種模式下,當描述符從未就緒變為就緒狀態時,內核通過epoll告訴你。然後它會假設你知道文件描述符已經就緒,並且不會再為那個描述符發送更多的就緒通知,直到你做了某些操作導致那個文件描述符不再為就緒狀態了(比如:你在發送、接受或者接受請求,或者發送接受的數據少於一定量時導致了一個EWOULDBLOCK錯誤)。但是請注意,如果一直不對這個fs做IO操作(從而導致它再次變成未就緒狀態),內核不會發送更多的通知。

  區別:LT事件不會丟棄,而是只要讀buffer裡面有數據可以讓用戶讀取,則不斷的通知你。而ET則只在事件發生之時通知。

使用方式:

  1、int epoll_create(int size)

創建一個epoll句柄,參數size用來告訴內核監聽的數目。

  2、int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

epoll事件注冊函數,

  參數epfd為epoll的句柄;

  參數op表示動作,用3個宏來表示:EPOLL_CTL_ADD(注冊新的fd到epfd),EPOLL_CTL_MOD(修改已經注冊的fd的監聽事件),EPOLL_CTL_DEL(從epfd刪除一個fd);

  參數fd為需要監聽的標示符;

  參數event告訴內核需要監聽的事件,event的結構如下:

struct epoll_event {
  __uint32_t events;  /* Epoll events */
  epoll_data_t data;  /* User data variable */
};

  其中events可以用以下幾個宏的集合:

  EPOLLIN :表示對應的文件描述符可以讀(包括對端SOCKET正常關閉)

  EPOLLOUT:表示對應的文件描述符可以寫

  EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這裡應該表示有帶外數據到來)

  EPOLLERR:表示對應的文件描述符發生錯誤

  EPOLLHUP:表示對應的文件描述符被掛斷;

  EPOLLET: 將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來說的

  EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之後,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL隊列裡

3、 int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)

  等待事件的產生,類似於select()調用。參數events用來從內核得到事件的集合,maxevents告之內核這個events有多大,這個maxevents的值不能大於創建epoll_create()時的size,參數timeout是超時時間(毫秒,0會立即返回,-1將不確定,也有說法說是永久阻塞)。該函數返回需要處理的事件數目,如返回0表示已超時。

應用舉例:

  下面,我引用google code中別人寫的一個簡單程序來進行說明。svn路徑:http://sechat.googlecode.com/svn/trunk/

  該程序一個簡單的聊天室程序,用Linux C++寫的,服務器主要是用epoll模型實現,支持高並發,我測試在有10000個客戶端連接服務器的時候,server處理時間不到1秒,當然客戶端只是與服務器連接之後,接受服務器的歡迎消息而已,並沒有做其他的通信。雖然程序比較簡單,但是在我們考慮服務器高並發時也提供了一個思路。在這個程序中,我已經把所有的調試信息和一些與epoll無關的信息干掉,並添加必要的注釋,應該很容易理解。

  程序共包含2個頭文件和3個cpp文件。其中3個cpp文件中,每一個cpp文件都是一個應用程序,server.cpp:服務器程序,client.cpp:單個客戶端程序,tester.cpp:模擬高並發,開啟10000個客戶端去連服務器。

  utils.h頭文件,就包含一個設置socket為不阻塞函數,如下:

int setnonblocking(int sockfd)
{
    CHK(fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK));
    return 0;
}

  local.h頭文件,一些常量的定義和函數的聲明,如下:

復制代碼
#define BUF_SIZE 1024                 //默認緩沖區
#define SERVER_PORT 44444             //監聽端口
#define SERVER_HOST "192.168.34.15"   //服務器IP地址
#define EPOLL_RUN_TIMEOUT -1          //epoll的超時時間
#define EPOLL_SIZE 10000              //epoll監聽的客戶端的最大數目

#define STR_WELCOME "Welcome to seChat! You ID is: Client #%d"
#define STR_MESSAGE "Client #%d>> %s"
#define STR_NOONE_CONNECTED "Noone connected to server except you!"
#define CMD_EXIT "EXIT"

//兩個有用的宏定義:檢查和賦值並且檢測
#define CHK(eval) if(eval < 0){perror("eval"); exit(-1);}
#define CHK2(res, eval) if((res = eval) < 0){perror("eval"); exit(-1);}

//================================================================================================
//函數名:                  setnonblocking
//函數描述:                設置socket為不阻塞
//輸入:                    [in] sockfd socket標示符
//輸出:                    無
//返回:                    0
//================================================================================================
int setnonblocking(int sockfd);

//================================================================================================ //函數名: handle_message //函數描述: 處理每個客戶端socket //輸入: [in] new_fd socket標示符 //輸出: 無 //返回: 返回從客戶端接受的數據的長度 //================================================================================================ int handle_message(int new_fd);
復制代碼

  server.cpp文件,epoll模型就在這裡實現,如下:

復制代碼
#include "local.h"
#include "utils.h"

using namespace std;

// 存放客戶端socket描述符的list
list<int> clients_list;

int main(int argc, char *argv[])
{
    int listener;   //監聽socket
    struct sockaddr_in addr, their_addr;  
    addr.sin_family = PF_INET;
    addr.sin_port = htons(SERVER_PORT);
    addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
    socklen_t socklen;
    socklen = sizeof(struct sockaddr_in);

    static struct epoll_event ev, events[EPOLL_SIZE];
    ev.events = EPOLLIN | EPOLLET;     //對讀感興趣,邊沿觸發

    char message[BUF_SIZE];

    int epfd;  //epoll描述符
    clock_t tStart;  //計算程序運行時間

    int client, res, epoll_events_count;

    CHK2(listener, socket(PF_INET, SOCK_STREAM, 0));             //初始化監聽socket
    setnonblocking(listener);                                    //設置監聽socket為不阻塞
    CHK(bind(listener, (struct sockaddr *)&addr, sizeof(addr))); //綁定監聽socket
    CHK(listen(listener, 1));                                    //設置監聽

    CHK2(epfd,epoll_create(EPOLL_SIZE));                         //創建一個epoll描述符,並將監聽socket加入epoll
    ev.data.fd = listener;
    CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, listener, &ev));

    while(1)
    {
        CHK2(epoll_events_count,epoll_wait(epfd, events, EPOLL_SIZE, EPOLL_RUN_TIMEOUT));
        tStart = clock();
        for(int i = 0; i < epoll_events_count ; i++)
        {
            if(events[i].data.fd == listener)                    //新的連接到來,將連接添加到epoll中,並發送歡迎消息
            {
                CHK2(client,accept(listener, (struct sockaddr *) &their_addr, &socklen));
                setnonblocking(client);
                ev.data.fd = client;
                CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev));

                clients_list.push_back(client);                  // 添加新的客戶端到list
                bzero(message, BUF_SIZE);
                res = sprintf(message, STR_WELCOME, client);
                CHK2(res, send(client, message, BUF_SIZE, 0));

            }else 
            {
                CHK2(res,handle_message(events[i].data.fd)); //注意:這裡並沒有調用epoll_ctl重新設置socket的事件類型,但還是可以繼續收到客戶端發送過來的信息
            }
        }
        printf("Statistics: %d events handled at: %.2f second(s)\n", epoll_events_count, (double)(clock() - tStart)/CLOCKS_PER_SEC);
    }

    close(listener);
    close(epfd);

    return 0;
}

int handle_message(int client)  
{
    char buf[BUF_SIZE], message[BUF_SIZE];
    bzero(buf, BUF_SIZE);
    bzero(message, BUF_SIZE);

    int len;

    CHK2(len,recv(client, buf, BUF_SIZE, 0));  //接受客戶端信息

    if(len == 0)   //客戶端關閉或出錯,關閉socket,並從list移除socket
    {
        CHK(close(client));
        clients_list.remove(client);
    }
    else          //向客戶端發送信息
    { 
        if(clients_list.size() == 1) 
        { 
            CHK(send(client, STR_NOONE_CONNECTED, strlen(STR_NOONE_CONNECTED), 0));
                return len;
        }
        
        sprintf(message, STR_MESSAGE, client, buf);
        list<int>::iterator it;
        for(it = clients_list.begin(); it != clients_list.end(); it++)
        {
           if(*it != client)
           { 
                CHK(send(*it, message, BUF_SIZE, 0));
           }
        }
    }

    return len;
}
復制代碼

  tester.cpp文件,模擬服務器的高並發,開啟10000個客戶端去連接服務器,如下:

復制代碼
#include "local.h"
#include "utils.h"

using namespace std;

char message[BUF_SIZE];     //接受服務器信息
list<int> list_of_clients;  //存放所有客戶端
int res;
clock_t tStart;

int main(int argc, char *argv[])
{
    int sock; 
    struct sockaddr_in addr;
    addr.sin_family = PF_INET;
    addr.sin_port = htons(SERVER_PORT);
    addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
    
    tStart = clock();

    for(int i=0 ; i<EPOLL_SIZE; i++)  //生成EPOLL_SIZE個客戶端,這裡是10000個,模擬高並發
    {
       CHK2(sock,socket(PF_INET, SOCK_STREAM, 0));
       CHK(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0);
       list_of_clients.push_back(sock);

       bzero(&message, BUF_SIZE);
       CHK2(res,recv(sock, message, BUF_SIZE, 0));
       printf("%s\n", message);
    }
   
    list<int>::iterator it;          //移除所有客戶端
    for(it = list_of_clients.begin(); it != list_of_clients.end() ; it++)
       close(*it);

    printf("Test passed at: %.2f second(s)\n", (double)(clock() - tStart)/CLOCKS_PER_SEC); 
    printf("Total server connections was: %d\n", EPOLL_SIZE);
    
    return 0;
}
復制代碼

  我就不給出程序的執行結果的截圖了,不過下面這張截圖是代碼作者自己測試的,可以看出,並發10000無壓力呀 

  單個客戶端去連接服務器,client.cpp文件,如下:

復制代碼
#include "local.h"
#include "utils.h"

using namespace std;

char message[BUF_SIZE];

/*
    流程:
        調用fork產生兩個進程,兩個進程通過管道進行通信
        子進程:等待客戶輸入,並將客戶輸入的信息通過管道寫給父進程
        父進程:接受服務器的信息並顯示,將從子進程接受到的信息發送給服務器
*/
int main(int argc, char *argv[])
{
    int sock, pid, pipe_fd[2], epfd;

    struct sockaddr_in addr;
    addr.sin_family = PF_INET;
    addr.sin_port = htons(SERVER_PORT);
    addr.sin_addr.s_addr = inet_addr(SERVER_HOST);

    static struct epoll_event ev, events[2]; 
    ev.events = EPOLLIN | EPOLLET;

    //退出標志
    int continue_to_work = 1;

    CHK2(sock,socket(PF_INET, SOCK_STREAM, 0));
    CHK(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0);

    CHK(pipe(pipe_fd));
    
    CHK2(epfd,epoll_create(EPOLL_SIZE));

    ev.data.fd = sock;
    CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev));

    ev.data.fd = pipe_fd[0];
    CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd[0], &ev));

    // 調用fork產生兩個進程
    CHK2(pid,fork());
    switch(pid)
    {
        case 0:                   // 子進程
            close(pipe_fd[0]);    // 關閉讀端
            printf("Enter 'exit' to exit\n");
            while(continue_to_work)
            {
                bzero(&message, BUF_SIZE);
                fgets(message, BUF_SIZE, stdin);

                // 當收到exit命令時,退出
                if(strncasecmp(message, CMD_EXIT, strlen(CMD_EXIT)) == 0)
                {
                    continue_to_work = 0;
                }
                else
                {            
                    CHK(write(pipe_fd[1], message, strlen(message) - 1));
                }
            }
            break;
        default:                 // 父進程
            close(pipe_fd[1]);   // 關閉寫端
            int epoll_events_count, res;
            while(continue_to_work) 
            {
                CHK2(epoll_events_count,epoll_wait(epfd, events, 2, EPOLL_RUN_TIMEOUT));

                for(int i = 0; i < epoll_events_count ; i++)
                {
                    bzero(&message, BUF_SIZE);
                    if(events[i].data.fd == sock)   //從服務器接受信息
                    {
                        CHK2(res,recv(sock, message, BUF_SIZE, 0));
                        if(res == 0)               //服務器已關閉
                        {
                            CHK(close(sock));
                            continue_to_work = 0;
                        }
                        else 
                        {
                            printf("%s\n", message);
                        }
                    }
                    else        //從子進程接受信息
                    {
                        CHK2(res, read(events[i].data.fd, message, BUF_SIZE));
                        if(res == 0)
                        {
                            continue_to_work = 0; 
                        }
                        else
                        {
                            CHK(send(sock, message, BUF_SIZE, 0));
                        }
                    }
                }
            }
    }
    if(pid)
    {
        close(pipe_fd[0]);
        close(sock);
    }else
    {
        close(pipe_fd[1]);
    }

    return 0;
}
復制代碼

 

   

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved