開發高性能網絡程序時,windows開發者們言必稱iocp,linux開發者們則言必稱epoll。大家都明白epoll是一種IO多路復用技術,可以非常高效的處理數以百萬計的socket句柄,比起以前的select和poll效率高大發了。我們用起epoll來都感覺挺爽,確實快,那么,它到底為什么可以高速處理這么多并發連接呢?
先簡單回顧下如何使用C庫封裝的3個epoll系統調用吧。
[cpp]view plaincopy
?
1????????int epoll_create(int size);?
2????????int epoll_ctl(int epfd, int op, int fd, struct epoll_event*event);?
3????????int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);?
????? 使用起來很清晰,首先要調用epoll_create建立一個epoll對象。參數size是內核保證能夠正確處理的最大句柄數,多于這個最大數時內核可不保證效果。
epoll_ctl可以操作上面建立的epoll,例如,將剛建立的socket加入到epoll中讓其監控,或者把 epoll正在監控的某個socket句柄移出epoll,不再監控它等等。
epoll_wait在調用時,在給定的timeout時間內,當在監控的所有句柄中有事件發生時,就返回用戶態的進程。
????? 從上面的調用方式就可以看到epoll比select/poll的優越之處:因為后者每次調用時都要傳遞你所要監控的所有socket給select/poll系統調用,這意味著需要將用戶態的socket列表copy到內核態,如果以萬計的句柄會導致每次都要copy幾十幾百KB的內存到內核態,非常低效。而我們調用epoll_wait時就相當于以往調用select/poll,但是這時卻不用傳遞socket句柄給內核,因為內核已經在epoll_ctl中拿到了要監控的句柄列表。
????? 所以,實際上在你調用epoll_create后,內核就已經在內核態開始準備幫你存儲要監控的句柄了,每次調用epoll_ctl只是在往內核的數據結構里塞入新的socket句柄。
????? 在內核里,一切皆文件。所以,epoll向內核注冊了一個文件系統,用于存儲上述的被監控socket。當你調用epoll_create時,就會在這個虛擬的epoll文件系統里創建一個file結點。當然這個file不是普通文件,它只服務于epoll。
????? epoll在被內核初始化時(操作系統啟動),同時會開辟出epoll自己的內核高速cache區,用于安置每一個我們想監控的socket,這些socket會以紅黑樹的形式保存在內核cache里,以支持快速的查找、插入、刪除。這個內核高速cache區,就是建立連續的物理內存頁,然后在之上建立slab層,簡單的說,就是物理上分配好你想要的size的內存對象,每次使用時都是使用空閑的已分配好的對象。
[cpp]view plaincopy
?
4????????staticint __init eventpoll_init(void)?
5????????{?
6????????????......?
7??????????
8????????????/* Allocates slabcache used to allocate "struct epitem" items */?
9????????????epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),?
10????? ????????????0,SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC,?
11????? ????????????NULL, NULL);?
12????? ??
13????? ????/* Allocates slab cache used to allocate"struct eppoll_entry" */?
14????? ????pwq_cache = kmem_cache_create("eventpoll_pwq",?
15????? ????????????sizeof(struct eppoll_entry),0,?
16????? ????????????EPI_SLAB_DEBUG|SLAB_PANIC, NULL,NULL);?
17????? ??
18????? ?... ...?
?
????? epoll的高效就在于,當我們調用epoll_ctl往里塞入百萬個句柄時,epoll_wait仍然可以飛快的返回,并有效的將發生事件的句柄給我們用戶。這是由于我們在調用epoll_create時,內核除了幫我們在epoll文件系統里建了個file結點,在內核cache里建了個紅黑樹用于存儲以后epoll_ctl傳來的socket外,還會再建立一個list鏈表,用于存儲準備就緒的事件,當epoll_wait調用時,僅僅觀察這個list鏈表里有沒有數據即可。有數據就返回,沒有數據就sleep,等到timeout時間到后即使鏈表沒數據也返回。所以,epoll_wait非常高效。
????? 而且,通常情況下即使我們要監控百萬計的句柄,大多一次也只返回很少量的準備就緒句柄而已,所以,epoll_wait僅需要從內核態copy少量的句柄到用戶態而已,如何能不高效?!
????? 那么,這個準備就緒list鏈表是怎么維護的呢?當我們執行epoll_ctl時,除了把socket放到epoll文件系統里file對象對應的紅黑樹上之外,還會給內核中斷處理程序注冊一個回調函數,告訴內核,如果這個句柄的中斷到了,就把它放到準備就緒list鏈表里。所以,當一個socket上有數據到了,內核在把網卡上的數據copy到內核中后就來把socket插入到準備就緒鏈表里了。
????? 如此,一顆紅黑樹,一張準備就緒句柄鏈表,少量的內核cache,就幫我們解決了大并發下的socket處理問題。執行epoll_create時,創建了紅黑樹和就緒鏈表,執行epoll_ctl時,如果增加socket句柄,則檢查在紅黑樹中是否存在,存在立即返回,不存在則添加到樹干上,然后向內核注冊回調函數,用于當中斷事件來臨時向準備就緒鏈表中插入數據。執行epoll_wait時立刻返回準備就緒鏈表里的數據即可。
????? 最后看看epoll獨有的兩種模式LT和ET。無論是LT和ET模式,都適用于以上所說的流程。區別是,LT模式下,只要一個句柄上的事件一次沒有處理完,會在以后調用epoll_wait時次次返回這個句柄,而ET模式僅在第一次返回。
??? ??這件事怎么做到的呢?當一個socket句柄上有事件時,內核會把該句柄插入上面所說的準備就緒list鏈表,這時我們調用epoll_wait,會把準備就緒的socket拷貝到用戶態內存,然后清空準備就緒list鏈表,最后,epoll_wait干了件事,就是檢查這些socket,如果不是ET模式(就是LT模式的句柄了),并且這些socket上確實有未處理的事件時,又把該句柄放回到剛剛清空的準備就緒鏈表了。所以,非ET的句柄,只要它上面還有事件,epoll_wait每次都會返回。而ET模式的句柄,除非有新中斷到,即使socket上的事件沒有處理完,也是不會次次從epoll_wait返回的
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>
#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10
int setnonblocking( int fd )
{
??? int old_option = fcntl( fd, F_GETFL );
??? int new_option = old_option | O_NONBLOCK;
??? fcntl( fd, F_SETFL, new_option );
??? return old_option;
}
//添加fd
void addfd( int epollfd, int fd, bool enable_et )
{
??? epoll_event event;
??? event.data.fd = fd;
??? event.events = EPOLLIN;
?? ?//是否開啟et模式
??? if( enable_et )
??? {
??????? event.events |= EPOLLET;
??? }
??? epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
??? setnonblocking( fd );
}
//LT模式的工作原理
void lt( epoll_event* events, int number, int epollfd, int listenfd )
{
??? char buf[ BUFFER_SIZE ];
??? for ( int i = 0; i < number; i++ )
??? {
??????? int sockfd = events[i].data.fd;
?? ??? ?//處理用戶注冊事件
??????? if ( sockfd == listenfd )
??????? {
??????????? struct sockaddr_in client_address;
??????????? socklen_t client_addrlength = sizeof( client_address );
??????????? int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
??????????? addfd( epollfd, connfd, false );
??????? }//可讀事件
??????? else if ( events[i].events & EPOLLIN )
??????? {
?? ??? ??? ?//只要socket讀緩存中還有未讀出的數據,這段代碼就被觸發
??????????? printf( "event trigger once\n" );
??????????? memset( buf, '\0', BUFFER_SIZE );
??????????? int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
??????????? if( ret <= 0 )
??????????? {
??????????????? close( sockfd );//表明當前套接字已經關閉
??????????????? continue;
??????????? }
??????????? printf( "get %d bytes of content: %s\n", ret, buf );
??????? }
??????? else
??????? {
??????????? printf( "something else happened \n" );
??????? }
??? }
}
//ET模式
void et( epoll_event* events, int number, int epollfd, int listenfd )
{
??? char buf[ BUFFER_SIZE ];
??? for ( int i = 0; i < number; i++ )
??? {
??????? int sockfd = events[i].data.fd;
??????? if ( sockfd == listenfd )
??????? {
??????????? struct sockaddr_in client_address;
??????????? socklen_t client_addrlength = sizeof( client_address );
??????????? int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
??????????? addfd( epollfd, connfd, true );
??????? }
??????? else if ( events[i].events & EPOLLIN )
??????? {
??????????? printf( "event trigger once\n" );
?? ??? ??? ?//這段代碼不會重復觸發,因此需要我們循環讀取數據,以確保socket讀緩存中的所有數據全部讀出
??????????? while( 1 )
??????????? {
??????????????? memset( buf, '\0', BUFFER_SIZE );
??????????????? int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
??????????????? if( ret < 0 )
??????????????? {
?? ??? ??? ??? ??? ?//對于非阻塞IO,條件成立表明數據全部讀取完畢,此后。epoll就能再次觸發
?? ??? ??? ??? ??? ?//sockfd上的epollin事件,以驅動下一次讀事件
??????????????????? if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
??????????????????? {
??????????????????????? printf( "read later\n" );
??????????????????????? break;
??????????????????? }
??????????????????? close( sockfd );
??????????????????? break;
??????????????? }
??????????????? else if( ret == 0 )
??????????????? {
??????????????????? close( sockfd );
??????????????? }
??????????????? else
??????????????? {
??????????????????? printf( "get %d bytes of content: %s\n", ret, buf );
??????????????? }
??????????? }
??????? }
??????? else
??????? {
??????????? printf( "something else happened \n" );
??????? }
??? }
}
int main( int argc, char* argv[] )
{
??? if( argc <= 2 )
??? {
??????? printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
??????? return 1;
??? }
??? const char* ip = argv[1];
??? int port = atoi( argv[2] );
??? int ret = 0;
??? struct sockaddr_in address;
??? bzero( &address, sizeof( address ) );
??? address.sin_family = AF_INET;
??? inet_pton( AF_INET, ip, &address.sin_addr );
??? address.sin_port = htons( port );
??? int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
??? assert( listenfd >= 0 );
??? ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
??? assert( ret != -1 );
??? ret = listen( listenfd, 5 );
??? assert( ret != -1 );
??? epoll_event events[ MAX_EVENT_NUMBER ];
??? int epollfd = epoll_create( 5 );
??? assert( epollfd != -1 );
??? addfd( epollfd, listenfd, true );
??? while( 1 )
??? {
??????? int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
??????? if ( ret < 0 )
??????? {
??????????? printf( "epoll failure\n" );
??????????? break;
??????? }
??????? //測試不同模式
??????? lt( events, ret, epollfd, listenfd );
??????? et( events, ret, epollfd, listenfd );
??? }
??? close( listenfd );
??? return 0;
}