IO復用
多進程/線程并發模型,為每個sockets分配一個進程/線程
I/O(多路)復用,采用單個進/線程就可以管理多個socket
I/O復用有3種方案:
- select
- poll
- epoll
select
I/O多路復用詳解
27、fd_set與FD_SETSIZE詳解
詳解fd_set結構體
fd_set結構體
#include <sys/select.h>#define FD_SETSIZE 1024
#define NFDBITS (8 * sizeof(unsigned long))
#define __FDSET_LONGS (FD_SETSIZE/NFDBITS)typedef struct {unsigned long fds_bits[__FDSET_LONGS];
} fd_set;或者typedef struct{long int fds_bits[32];
}fd_set;
fd_set
是文件描述符 fd
的集合,由于每個進程可打開的文件描述符默認值為1024,fd_set
可記錄的 fd
個數上限也是1024個
fd_set
采用位圖 bitmap
結構,是一個大小為32的 long 型數組,每一個 bit 代表一個描述符是否被監視(類似于一個32x32的矩陣)
操作函數
#include <sys/select.h>
#include <sys/time.h>
void FD_SET(int fd, fd_set *fdset);
void FD_CLR(int fd, fd_set *fdset);
void FD_ISSET(int fd, fd_set *fdset);
void FD_ZERO(fd_set *fdset);
FD_ZERO(&fdset); /將set清零使集合中不含任何fd,清空fdset與所有文件句柄的聯系/
FD_SET(fd, &fdset); /將fd加入set集合,建立文件句柄fd與fdset的聯系/
FD_CLR(fd, &fdset); /將fd從set集合中清除,清除文件句柄fd與fdset的聯系/
FD_ISSET(fd, &fdset); /在調用select()函數后,用FD_ISSET來檢測fd是否在set集合中,當檢測到fd在set中則返回真,否則,返回假(0)/
select()函數
// nfds:fds中最大fd的值加1
// readfds: 讀數據文件描述符集合
// writefds: 寫數據文件描述符集合
// exceptfds: 異常情況的文件描述符集合
// timeout: 該方法阻塞的超時時間
int select (int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);struct timeval {long tv_sec; //秒long tv_usec; //毫秒
}
- 用戶進程通過
select
系統調用把fd_set
結構的數據拷貝
到內核,由內核來監視并判斷哪些連接有數據到來,如果有連接準備好數據,select
系統調用就返回 select
返回后,用戶進程只知道某個或某幾個連接有數據,但并不知道是哪個連接。所以需要遍歷
fds
中的每個fd
, 當該fd
被置位時,代表該fd
表示的連接有數據需要被讀取。然后我們讀取該fd
的數據并進行業務操作select
第一個參數需要傳入最大fd值加1
的數值,目的是為了用戶能自定義監視的fd
范圍,防止不必要資源消耗- 操作系統會復用用戶進程傳入的
fd_set
變量,來作為出參,所以我們傳入的fd_set
返回時已經被內核修改
過了 select
的方式選擇讓內核來幫我們監視這些fd
,當有數據可讀時就通知我們,避免listenfd在accept()時阻塞
,提升了效率
返回值:
- **>0:**有事件發生
- **=0:**timeout,超時
- **<0:**出錯
示例程序
-
tcpseletc.cpp
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <arpa/inet.h> #include <sys/fcntl.h>// 初始化服務端的監聽端口。 int initserver(int port);int main(int argc,char *argv[]) {if (argc != 2){printf("usage: ./tcpselect port\n"); return -1;}// 初始化服務端用于監聽的socket。int listensock = initserver(atoi(argv[1]));printf("listensock=%d\n",listensock);if (listensock < 0){printf("initserver() failed.\n"); return -1;}fd_set readfdset; // 讀事件的集合,包括監聽socket和客戶端連接上來的socket。int maxfd; // readfdset中socket的最大值。// 初始化結構體,把listensock添加到集合中。FD_ZERO(&readfdset);FD_SET(listensock,&readfdset);maxfd = listensock;while (1){// 調用select函數時,會改變socket集合的內容,所以要把socket集合保存下來,傳一個臨時的給select。fd_set tmpfdset = readfdset;int infds = select(maxfd+1,&tmpfdset,NULL,NULL,NULL);// printf("select infds=%d\n",infds);// 返回失敗。if (infds < 0){printf("select() failed.\n"); perror("select()"); break;}// 超時,在本程序中,select函數最后一個參數為空,不存在超時的情況,但以下代碼還是留著。if (infds == 0){printf("select() timeout.\n"); continue;}// 檢查有事情發生的socket,包括監聽和客戶端連接的socket。// 這里是客戶端的socket事件,每次都要遍歷整個集合,因為可能有多個socket有事件。for (int eventfd=0; eventfd <= maxfd; eventfd++){if (FD_ISSET(eventfd,&tmpfdset)<=0) continue; //判斷時用tmpfdset集合if (eventfd==listensock){ // 如果發生事件的是listensock,表示有新的客戶端連上來。struct sockaddr_in client;socklen_t len = sizeof(client);int clientsock = accept(listensock,(struct sockaddr*)&client,&len);if (clientsock < 0){printf("accept() failed.\n"); continue;}printf ("client(socket=%d) connected ok.\n",clientsock);// 把新的客戶端socket加入集合,readfdset集合,注意區別何時用tmpfdset何時用readfdsetFD_SET(clientsock,&readfdset);if (maxfd < clientsock) maxfd = clientsock;continue;}else{// 客戶端有數據過來或客戶端的socket連接被斷開。char buffer[1024];memset(buffer,0,sizeof(buffer));// 讀取客戶端的數據。ssize_t isize=read(eventfd,buffer,sizeof(buffer));// 發生了錯誤或socket被對方關閉。if (isize <=0){printf("client(eventfd=%d) disconnected.\n",eventfd);close(eventfd); // 關閉客戶端的socket。FD_CLR(eventfd,&readfdset); // 從readfdset集合中移去客戶端的socket。// 重新計算maxfd的值,注意,只有當eventfd==maxfd時才需要計算。if (eventfd == maxfd){for (int ii=maxfd;ii>0;ii--){if (FD_ISSET(ii,&readfdset)){maxfd = ii; break;}}printf("maxfd=%d\n",maxfd);}continue;}printf("recv(eventfd=%d,size=%d):%s\n",eventfd,isize,buffer);// 把收到的報文發回給客戶端。write(eventfd,buffer,strlen(buffer));}}}return 0; }// 初始化服務端的監聽端口。 int initserver(int port) {int sock = socket(AF_INET,SOCK_STREAM,0);if (sock < 0){printf("socket() failed.\n"); return -1;}// Linux如下int opt = 1; unsigned int len = sizeof(opt);setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,len);setsockopt(sock,SOL_SOCKET,SO_KEEPALIVE,&opt,len);struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(port);if (bind(sock,(struct sockaddr *)&servaddr,sizeof(servaddr)) < 0 ){printf("bind() failed.\n"); close(sock); return -1;}if (listen(sock,5) != 0 ){printf("listen() failed.\n"); close(sock); return -1;}return sock; }
select缺陷與不足
- 可監控的文件描述符數量最大為 1024 個,就代表最大能支持的并發為1024,這個是操作系統內核決定的
- 用戶進程的文件描述符集合
fd_set
每次都需要從用戶進程拷貝到內核,有一定的性能開銷 select
函數返回,我們只知道有文件描述符滿足要求,但不知道是哪個,所以需要遍歷所有文件描述符,復雜度為O(n)select
機制的這些特性在高并發網絡服務器動輒幾萬幾十萬并發連接的場景下是低效的
poll
poll
是另一種I/O多路復用的實現方式,它解決了 select
1024個文件描述符的限制問題
poll
是使用 pollfd
結構來替代了 select
的 fd_set
位圖,以解決 1024 的文件描述符個數限制
struct pollfd
{int fd; /* file descriptor */short events; /* requested events */short revents; /* returned events */
};
fd
表示要監視的文件描述符events
表示要監視的事件,比如輸入、輸出或異常revents
表示返回的標志位,標識哪個事件有信息到來,處理完成后記得重置標志位
poll
函數的定義
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
poll
函數的第一個參數傳入了一個自定義的pollfd
的數組,原則上已經沒有了個數的限制- 但
poll
除了解決了select
存在的文件描述符個數的限制,并沒有解決select
存在的其他問題(拷貝
和輪詢
) select
和poll
都會隨著監控的文件描述符數量增加而性能下降,因此也不太適合高并發場景
epoll
epoll
使用一個文件描述符管理多個描述符,省去了大量文件描述符頻繁在用戶態和內核態之間拷貝的資源消耗
epoll
操作過程有三個非常重要的接口
epoll_create()函數
/* Creates an epoll instance. Returns an fd for the new instance.The "size" parameter is a hint specifying the number of filedescriptors to be associated with the new instance. The fdreturned by epoll_create() should be closed with close(). */
extern int epoll_create (int __size) __THROW;/* Same as epoll_create but with an FLAGS parameter. The unused SIZEparameter has been dropped. */
extern int epoll_create1 (int __flags) __THROW;
epoll_create()
方法生成一個 epoll
專用的文件描述符(創建一個 epoll
的句柄)
參數 size
在新版本中沒有具體意義,填一個大于0的任意值即可
epoll_ctl()函數
/* Manipulate an epoll instance "epfd". Returns 0 in case of success,-1 in case of error ( the "errno" variable will contain thespecific error code ) The "op" parameter is one of the EPOLL_CTL_*constants defined above. The "fd" parameter is the target of theoperation. The "event" parameter describes which events the calleris interested in and any associated user data. */
extern int epoll_ctl (int __epfd, int __op, int __fd,struct epoll_event *__event) __THROW;
epfd
:epoll
專用的文件描述符,epoll_create()
的返回值op
:表示添加、修改、刪除的動作,用三個宏來表示:
/* Valid opcodes ( "op" parameter ) to issue to epoll_ctl(). */
#define EPOLL_CTL_ADD 1 /* Add a file descriptor to the interface. */
#define EPOLL_CTL_DEL 2 /* Remove a file descriptor from the interface. */
#define EPOLL_CTL_MOD 3 /* Change file descriptor epoll_event structure. */
fd
:需要監聽的文件描述符event
:告訴內核要監聽的事件
epoll_event結構體
前端 詳解epoll_events結構體
typedef union epoll_data
{void *ptr;int fd;uint32_t u32;uint64_t u64;
} epoll_data_t;struct epoll_event
{uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */
} __EPOLL_PACKED;
定義了枚舉類型的events
enum EPOLL_EVENTS{EPOLLIN = 0x001,
#define EPOLLIN EPOLLINEPOLLPRI = 0x002,
#define EPOLLPRI EPOLLPRIEPOLLOUT = 0x004,
#define EPOLLOUT EPOLLOUTEPOLLRDNORM = 0x040,
#define EPOLLRDNORM EPOLLRDNORMEPOLLRDBAND = 0x080,
#define EPOLLRDBAND EPOLLRDBANDEPOLLWRNORM = 0x100,
#define EPOLLWRNORM EPOLLWRNORMEPOLLWRBAND = 0x200,
#define EPOLLWRBAND EPOLLWRBANDEPOLLMSG = 0x400,
#define EPOLLMSG EPOLLMSGEPOLLERR = 0x008,
#define EPOLLERR EPOLLERREPOLLHUP = 0x010,
#define EPOLLHUP EPOLLHUPEPOLLRDHUP = 0x2000,
#define EPOLLRDHUP EPOLLRDHUPEPOLLEXCLUSIVE = 1u << 28,
#define EPOLLEXCLUSIVE EPOLLEXCLUSIVEEPOLLWAKEUP = 1u << 29,
#define EPOLLWAKEUP EPOLLWAKEUPEPOLLONESHOT = 1u << 30,
#define EPOLLONESHOT EPOLLONESHOTEPOLLET = 1u << 31
#define EPOLLET EPOLLET};
epoll_wait()函數
/* Wait for events on an epoll instance "epfd". Returns the number oftriggered events returned in "events" buffer. Or -1 in case oferror with the "errno" variable set to the specific error code. The"events" parameter is a buffer that will contain triggeredevents. The "maxevents" is the maximum number of events to bereturned ( usually size of "events" ). The "timeout" parameterspecifies the maximum wait time in milliseconds (-1 == infinite).This function is a cancellation point and therefore not marked with__THROW. */
extern int epoll_wait (int __epfd, struct epoll_event *__events,int __maxevents, int __timeout);
epoll_wait()
方法等待事件的產生,類似 select
調用
epfd
:epoll
專用的文件描述符,epoll_create()
的返回值events
:分配好的epoll_event
結構體數組,epoll
將會把發生的事件賦值到events
數組中maxevents
:告訴內核events
數組的大小timeout
:超時時間,單位毫秒,為 -1 時,方法為阻塞
值得注意的是epoll_wait()
函數只能獲取是否有注冊事件發生,至于這個事件到底是什么、從哪個 socket 來、發送的時間、包的大小等等信息,統統不知道。這就好比一個人在黑黢黢的山洞里,只能聽到聲響,至于這個聲音是誰發出的根本不知道。因此我們就需要struct epoll_event
來幫助我們讀取信息
實例
-
tcpepoll.cpp
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> #include <unistd.h> #include <fcntl.h> #include <arpa/inet.h> #include <netinet/in.h> #include <sys/epoll.h> #include <sys/socket.h> #include <sys/types.h>#define MAXEVENTS 100// 把socket設置為非阻塞的方式。 int setnonblocking(int sockfd);// 初始化服務端的監聽端口。 int initserver(int port);int main(int argc, char *argv[]) {if (argc != 2){printf("usage:./tcpepoll port\n");return -1;}// 初始化服務端用于監聽的socket。int listensock = initserver(atoi(argv[1]));printf("listensock=%d\n", listensock);if (listensock < 0){printf("initserver() failed.\n");return -1;}int epollfd;char buffer[1024];memset(buffer, 0, sizeof(buffer));// 創建一個描述符epollfd = epoll_create(1);// 添加監聽描述符事件struct epoll_event ev;ev.data.fd = listensock;ev.events = EPOLLIN;epoll_ctl(epollfd, EPOLL_CTL_ADD, listensock, &ev);while (1){struct epoll_event events[MAXEVENTS]; // 存放有事件發生的結構數組。// 等待監視的socket有事件發生。int infds = epoll_wait(epollfd, events, MAXEVENTS, -1);// printf("epoll_wait infds=%d\n",infds);// 返回失敗。if (infds < 0){printf("epoll_wait() failed.\n");perror("epoll_wait()");break;}// 超時。if (infds == 0){printf("epoll_wait() timeout.\n");continue;}// 遍歷有事件發生的結構數組。for (int ii = 0; ii < infds; ii++){if ((events[ii].data.fd == listensock) && (events[ii].events & EPOLLIN)){// 如果發生事件的是listensock,表示有新的客戶端連上來。struct sockaddr_in client;socklen_t len = sizeof(client);int clientsock = accept(listensock, (struct sockaddr *)&client, &len);if (clientsock < 0){printf("accept() failed.\n");continue;}// 把新的客戶端添加到epoll中。memset(&ev, 0, sizeof(struct epoll_event));ev.data.fd = clientsock;ev.events = EPOLLIN;epoll_ctl(epollfd, EPOLL_CTL_ADD, clientsock, &ev);printf("client(socket=%d) connected ok.\n", clientsock);continue;}else if (events[ii].events & EPOLLIN){// 客戶端有數據過來或客戶端的socket連接被斷開。char buffer[1024];memset(buffer, 0, sizeof(buffer));// 讀取客戶端的數據。ssize_t isize = read(events[ii].data.fd, buffer, sizeof(buffer));// 發生了錯誤或socket被對方關閉。if (isize <= 0){printf("client(eventfd=%d) disconnected.\n", events[ii].data.fd);// 把已斷開的客戶端從epoll中刪除。memset(&ev, 0, sizeof(struct epoll_event));ev.events = EPOLLIN;ev.data.fd = events[ii].data.fd;epoll_ctl(epollfd, EPOLL_CTL_DEL, events[ii].data.fd, &ev);close(events[ii].data.fd); //或者一行關閉命令即可continue;}printf("recv(eventfd=%d,size=%d):%s\n", events[ii].data.fd, isize, buffer);// 把收到的報文發回給客戶端。write(events[ii].data.fd, buffer, strlen(buffer));}}}close(epollfd);return 0; }// 初始化服務端的監聽端口。 int initserver(int port) {int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0){printf("socket() failed.\n");return -1;}// Linux如下int opt = 1;unsigned int len = sizeof(opt);setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, len);setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &opt, len);struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(port);if (bind(sock, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0){printf("bind() failed.\n");close(sock);return -1;}if (listen(sock, 5) != 0){printf("listen() failed.\n");close(sock);return -1;}return sock; }// 把socket設置為非阻塞的方式。 int setnonblocking(int sockfd) {if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0) | O_NONBLOCK) == -1)return -1;return 0; }
小結
epoll
底層使用了 RB-Tree
紅黑樹和 list
鏈表實現。內核創建了紅黑樹用于存儲 epoll_ctl
傳來的 socket,另外創建了一個 list
鏈表,用于存儲準備就緒的事件
當 epoll_wait
調用時,僅僅觀察這個 list 鏈表里有沒有數據即可。有數據就返回,沒有數據就阻塞。所以,epoll_wait
非常高效,通常情況下即使我們要監控百萬計的連接,大多一次也只返回很少量準備就緒的文件描述符而已,所以,epoll_wait
僅需要從內核態拷貝很少的文件描述符到用戶態
epoll
相比于 select
和 poll
,它更高效的本質在于:
- 減少了用戶態和內核態文件描述符狀態的拷貝,
epoll
只需要一個專用的文件句柄即可 - 減少了文件描述符的遍歷,
select
和poll
每次都要遍歷所有的文件描述符,用來判斷哪個連接準備就緒;epoll
返回的是準備就緒的文件描述符,效率大大提高 - 沒有并發數量的限制,性能不會隨文件描述符數量的增加而下降
IO復用總結
select
是較早實現的一種I/O多路復用技術,但它最明顯的缺點就是有 1024 個文件描述符數量的限制,也就導致它無法滿足高并發的需求
poll
一定程度上解決了 select
文件描述符數量的限制,但和 select
一樣,仍然存在文件描述符狀態在用戶態和內核態的頻繁拷貝,和遍歷所有文件描述符的問題,這導致了在面對高并發的實現需求時,它的性能不會很高
epoll
高效地解決了以上問題,首先使用一個特殊的文件描述符,解決了用戶態和內核態頻繁拷貝的問題;其次 epoll_wait
返回的是準備就緒的文件描述符,省去了無效的遍歷;再次,底層使用紅黑樹和鏈表的數據結構,更加高效地實現連接的監視
工作中常用的 redis、nginx 都是使用了 epoll
這種I/O復用模型,通過單線程就實現了10萬以上的并發訪問
epoll
不一定任何情況下都比 select
高效,需要根據具體場景。比如并發不是很高,且大部分都是活躍的 socket,那么也許 select
會比 epoll
更加高效,因為 epoll
會有更多次的系統調用,用戶態和內核態會有更加頻繁的切換