在上一節利用管道實現了一個簡單的聊天室,但這個聊天室有一個很明顯的問題就是,當A處于讀阻塞情況下是不能向B發送消息的,只有收到B的消息才能發送。如何實現同時既能接受B的消息,又能向其發送消息?
很遺憾,依靠基本的編程思維似乎無法解決這個問題。因為當A處于讀阻塞狀態時,程序是不可能往下執行的。那么現在的聊天軟件又是如何實現同時收發消息的?這個時候,我們就需要把問題交給OS幫我們來解決,操作系統的內核通過控制底層操作幫助我們實現了一些邏輯上的”并行“,也就是我們今天所說的 IO多路復用
什么是IO多路復用?
IO多路復用(I/O Multiplexing)是一種同時監控多個文件描述符(socket、管道、文件等)的IO狀態的機制。當其中任意一個或多個文件描述符就緒(可讀、可寫或異常)時,內核會通知應用程序,從而避免阻塞等待單個IO操作。
在傳統阻塞IO模型中,每個IO操作(如read
/write
)會阻塞線程直到完成。若需處理多個連接(如Web服務器),必須為每個連接創建一個線程/進程,導致資源浪費(線程上下文切換、內存占用)。
IO多路復用通過單線程監控多個IO事件,實現高并發、低資源消耗。
當聊天室使用了IO多路復用,就可以同時監控讀和寫對應的文件描述符,任何一個文件描述符就緒就會立即響應,然后繼續輪詢等待,從而實現了邏輯上的“并行”。
核心機制與系統調用
IO多路復用包含兩種系統調用,select與epoll。他們之間的實現方式是完全不同的
select
底層實現
維護一個位圖(fd集合),每次調用需遍歷所有fd檢查就緒狀態,輪詢和通知由OS完成。
位圖機制:通過一個固定大小的位圖(
fd_set
)來管理文件描述符(fd)。每個 fd 占用一個位,最大支持的 fd 數量通常為 1024。支持跨平臺(POSIX標準)。
線性掃描:每次調用時,內核會遍歷所有 fd,檢查它們是否就緒。時間復雜度為 O(n),其中 n 是最大 fd 數。fd數量增加時性能下降。
每次調用都需要重新傳遞 fd 集合:調用時需要將用戶態的 fd 集合拷貝到內核態,內核處理后再次拷貝回用戶態,效率較低。
使用流程
需要使用的系統調用:
#include <sys/select.h>
#include <sys/time.h>
//readset、writeset、exceptionset都是fd_set集合
//集合的相關操作如下:
void FD_ZERO(fd_set *fdset); /* 將所有fd清零 */
void FD_SET(int fd, fd_set *fdset);/* 增加一個fd */
void FD_CLR(int fd, fd_set *fdset);/* 刪除一個fd */
int FD_ISSET(int fd, fd_set *fdset);/* 判斷一個fd是否有設置 */
int select(int maxfd, fd_set *readset,fd_set *writeset, fd_set *exceptionset,
struct timeval * timeout);
這里先簡要地介紹一下 select 使用流程:
- 首先,需要先為監聽集合申請內存;
- 使用 FD_ZERO 初始化監聽集合;
- 將所有需要監聽的文件描述符使用 FD_SET?加入監聽集合;
- 調用 select 系統調用使進程陷入阻塞狀態;
- 從阻塞當中被喚醒以后,使用 FD_ISSET 遍歷所有監聽的文件描述符,找到真正就緒的文件描述符;
- 對就緒的文件描述符執行IO操作。
存在的問題:
- 每調用一次select 就需要3個事件類型的fd_set需從用戶空間拷貝到內核空間去,返回時select也會把保留了活躍事件的fd_set返回(從內核拷貝到用戶空間)。當fd_set數據大的時候,這個過程消耗是很大的。
- select需要逐個遍歷fd_set集合 ,然后去檢查對應fd的可讀寫狀態,如果fd_set 數據量多,那么遍歷fd_set 就是一個比較耗時的過程。
- fd_set是個集合類型的數據結構有長度限制,32位系統長度1024,64位系統長度2048,這個就限制了select最多能同時監控1024個連接。
系統調用
FR_ZERO
清空一個文件描述符集合。
void FD_ZERO(fd_set *fdset);fd_set readfds;
FD_ZERO(&readfds); // 清空集合
fdset
:指向fd_set
類型的指針,表示要清空的文件描述符集合。- 將
fd_set
中的所有位清零,表示集合中沒有任何文件描述符被設置。
FD_SET
將一個文件描述符加入到集合中。
void FD_SET(int fd, fd_set *fdset);fd_set readfds;
FD_ZERO(&readfds); // 清空集合
FD_SET(sockfd1, &readfds); // 將 sockfd1 加入集合
FD_SET(sockfd2, &readfds); // 將 sockfd2 加入集合
fd
:要加入集合的文件描述符。fdset
:指向fd_set
類型的指針,表示要操作的文件描述符集合。- 將指定的文件描述符
fd
設置為 1,表示該文件描述符被加入到集合中。
select
監控多個文件描述符的可讀、可寫和異常狀態。
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
nfds
:需要監控的最大文件描述符值加 1(即maxfd + 1
)。OS會從0~nfds的范圍內的輪詢,從而減少不必要的監聽(nfds+1~1024的位圖被忽略)readfds
:指向fd_set
類型的指針,表示要監控的可讀文件描述符集合。writefds
:指向fd_set
類型的指針,表示要監控的可寫文件描述符集合。exceptfds
:指向fd_set
類型的指針,表示要監控的異常狀態文件描述符集合。timeout
:指向struct timeval
類型的指針,表示超時時間。如果為NULL
,表示阻塞等待;如果為{0, 0}
,表示非阻塞。
返回值:
大于 0:表示就緒的文件描述符數量。
等于 0:表示超時,沒有任何文件描述符就緒。
小于 0:表示出錯。
select
函數會阻塞當前線程,直到集合中的某個文件描述符就緒(可讀、可寫或異常)或超時。如果某個文件描述符就緒,select
會返回就緒的文件描述符數量,并修改對應的集合。
示例:
fd_set readfds;
struct timeval tv;FD_ZERO(&readfds);
FD_SET(sockfd1, &readfds);
FD_SET(sockfd2, &readfds);tv.tv_sec = 5; // 設置超時時間為 5 秒
tv.tv_usec = 0;int ret = select(sockfd2 + 1, &readfds, NULL, NULL, &tv);
if (ret > 0) {if (FD_ISSET(sockfd1, &readfds)) {printf("sockfd1 is ready for reading\n");}if (FD_ISSET(sockfd2, &readfds)) {printf("sockfd2 is ready for reading\n");}
} else if (ret == 0) {printf("Timeout occurred\n");
} else {printf("Error occurred\n");
}
FD_ISSET
檢查一個文件描述符是否在集合中。
int FD_ISSET(int fd, const fd_set *fdset);
fd
:要檢查的文件描述符。fdset
:指向fd_set
類型的指針,表示要檢查的文件描述符集合。非零:表示文件描述符
fd
在集合中。零:表示文件描述符
fd
不在集合中。
檢查指定的文件描述符 fd
是否被設置為 1,即是否在集合中。
FD_CLR
FD_CLR
的主要功能是從一個文件描述符集合中移除一個指定的文件描述符。
當某個文件描述符不再需要被監控時(例如,關閉了某個 socket)。
在
select
調用后,需要清理某些不再需要的文件描述符。
void FD_CLR(int fd, fd_set *fdset);
fd
:要從集合中移除的文件描述符。fdset
:指向fd_set
類型的指針,表示要操作的文件描述符集合。
實戰:使用select對于基于管道的簡易聊天程序進行改進:
基于上節我們通過管道實現的簡易聊天程序,我們對其進行改進實現同時收發消息:
//客戶端Aint main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf("waiting for connect\n");int fdr = open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, "open fdr error");int fdw = open(argv[2], O_RDWR);ERROR_CHECK(fdr, -1, "open fdw error");printf("connected\n");char buf[1024];fd_set rdset;while (1){FD_ZERO(&rdset);FD_SET(STDIN_FILENO, &rdset);FD_SET(fdr, &rdset);select(fdr+1, &rdset, NULL, NULL, NULL);if(FD_ISSET(STDIN_FILENO, &rdset)){memset(buf, 0, sizeof(buf));int ret = read(STDIN_FILENO, buf, sizeof(buf));ERROR_CHECK(ret, -1, "STDIN ERROR");write(fdw, buf, strlen(buf));}if(FD_ISSET(fdr, &rdset)){memset(buf, 0, sizeof(buf));int ret = read(fdr, buf, sizeof(buf));ERROR_CHECK(ret, -1, "STDIN ERROR");if(ret == 0){printf("B is disconnected\n");break;}printf("B:%s", buf);}}return 0;
}//客戶端Bint main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf("waiting for connect\n");int fdw = open(argv[1], O_RDWR);ERROR_CHECK(fdw, -1, "open fdw error");int fdr = open(argv[2], O_RDONLY);ERROR_CHECK(fdr, -1, "open fdr error");printf("connected\n");char buf[1024];fd_set rdset;while (1){FD_ZERO(&rdset);FD_SET(STDIN_FILENO, &rdset);FD_SET(fdr, &rdset);select(fdr+1, &rdset, NULL, NULL, NULL);if(FD_ISSET(STDIN_FILENO, &rdset)){memset(buf, 0, sizeof(buf));int ret = read(STDIN_FILENO, buf, sizeof(buf));ERROR_CHECK(ret, -1, "STDIN ERROR");write(fdw, buf, strlen(buf));}if(FD_ISSET(fdr, &rdset)){memset(buf, 0, sizeof(buf));int ret = read(fdr, buf, sizeof(buf));ERROR_CHECK(ret, -1, "STDIN ERROR");if(ret == 0){printf("A is disconnected\n");break;}printf("A:%s", buf);}}return 0;
}
輸出結果:
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./selectA 1.pipe 2.pipe
waiting for connect
connected
hello!
B:who are you?
B:what are you doing?
I am A
I am eat dinner?
goodbye!
^Cubuntu@ubuntu:~/MyProject/Linux/IO$ ./selectB 1.pipe 2.pipe
waiting for connect
connected
A:hello!
who are you?
what are you doing?
A:I am A
A:I am eat dinner?
A:goodbye!
A is disconnected
可以看到,A和B可以同時收發消息,無需等待收到消息之后再發送
epoll(Linux特有)
在早期計算機網絡并不發達,所以并發網絡請求并不會很高,select模型也足夠使用了,但是隨著網絡的高速發展,高并發的網絡請求程序越來越多,而select模式下 fd_set 長度限制就開始成為了致命的缺陷。下圖顯示了隨著并發量的提升,不同IO多路復用機制的響應速度。
顯然,根據select的底層實現,不難發現它有如下缺陷:
- 位圖靠數組實現,當改變長度需要重新編譯
- 每次從內核態讀取就緒集合,和重新將文件描述符放入集合會產生大量的內核態和用戶態之間的冗余拷貝
- 監聽集合和就緒集合的耦合度高
- 就緒集合的處理性能低
吸取了select的教訓,epoll模式就不再使用數組的方式來保存自己所監控的fd信息了,epoll 可以在內核態空間當中維持兩個數據結構:監聽事件集合和就緒事件隊列。
監聽事件集合用來存儲所有需要關注的設備(即文件描述符)和對應操作(比如讀、寫、掛起和異常等等),當監聽的設備有事件產生時,比如網卡上接收到了數據并傳輸到了緩沖區當中時,硬件會采用中斷等方式通知操作系統,操作系統會將就緒事件拷貝到就緒事件隊列中,并且找到阻塞在 epoll_wait 的線程,讓其就緒。監聽事件集合通常是一個紅黑樹,就緒事件隊列是一個線性表。
底層實現
紅黑樹 + 就緒鏈表:使用紅黑樹管理所有注冊的 fd,當 fd 就緒時,將其加入就緒鏈表。時間復雜度為 O(1)。
- 邊緣觸發(ET)和水平觸發(LT):
ET:僅通知一次,需一次性處理完所有數據(減少事件觸發次數,高效但需非阻塞IO)。
LT:默認模式,fd就緒后,若未處理完,下次
epoll_wait
仍會通知。
事件驅動:內核維護一個事件表,只返回已經就緒的 fd,無需每次遍歷所有 fd。
無需重復傳遞 fd 集合:通過
epoll_ctl
動態管理 fd,無需在每次調用時重新傳遞 fd 集合。優勢:
時間復雜度O(1):僅返回就緒的fd,無需遍歷。
無fd數量限制:理論上僅受系統內存限制。
高效:通過
epoll_ctl
注冊/修改事件,避免每次調用時重復傳遞fd集合。
有了這些優勢之后, epoll 逐漸取代了 select 的市場地位,尤其是在管理巨大量連接的高
并發場景中, epoll 的性能要遠超 select 。
使用流程
需要使用的系統調用
#include<sys/epoll.h>
int epoll_create(int size); //創建 epoll 實例
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); //注冊文件描述符
//等待事件
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);//用于描述 epoll 就緒的事件及其關聯數據。
struct epoll_event {uint32_t events; //表示文件描述符上發生的事件類型。EPOLLIN 表示讀, EPOLLOUT 表示寫epoll_data_t data; //存儲與事件相關的數據,具體類型由用戶決定。
};//用于存儲就緒事件中與事件相關的不同類型的數據。
typedef union epoll_data {void*ptr;int fd; //存儲就緒事件對應的文件描述符uint32_t u32;uint64_t u64;
} epoll_data_t;
events 是一個 32 位的無符號整數,用于表示文件描述符上發生的事件類型。它可以是一個或多個事件標志的組合(通過位或操作)。常見的事件類型包括:
EPOLLIN:表示文件描述符可讀。
EPOLLOUT:表示文件描述符可寫。
EPOLLRDHUP:表示對端關閉連接(僅適用于 TCP 套接字)。
EPOLLPRI:表示有緊急數據可讀。
EPOLLERR:表示發生錯誤。
EPOLLHUP:表示掛起(文件描述符關閉)。
EPOLLET:表示邊緣觸發模式(Edge-Triggered)。
EPOLLONESHOT:表示一次性事件,事件處理完成后需要重新注冊。
創建?
epoll
?實例:使用epoll_create
或epoll_create1
創建一個epoll
文件描述符(epfd
)。這個文件描述符用于后續的epoll
操作。注冊文件描述符:使用
epoll_ctl
將需要監控的文件描述符(如 socket)注冊到epoll
實例中,并指定感興趣的事件(如可讀、可寫)。等待事件:使用
epoll_wait
等待epoll
實例中的事件。epoll_wait
會阻塞當前線程,直到有文件描述符就緒或超時。處理事件:當
epoll_wait
返回時,它會返回就緒的文件描述符數量(nfds
),并填充events
數組。程序可以遍歷events
數組,處理每個就緒的文件描述符。清理資源:當不再需要
epoll
實例時,可以關閉epoll
文件描述符,釋放相關資源。
?系統調用
epoll_create 和 epoll_create1
使用 epoll_create
或 epoll_create1
創建一個 epoll
文件描述符(epfd
)。這個文件描述符用于后續的 epoll
操作。
int epoll_create(int size);
int epoll_create1(int flags);
size
:建議的初始文件描述符數量(一般選擇1即可)。flags
:可以設置一些標志,如EPOLL_CLOEXEC
(設置文件描述符為關閉執行)。成功時返回一個有效的
epoll
文件描述符(非負整數)。失敗時返回?
-1
,并設置?errno
?以指示錯誤原因。
?epoll_ctl
使用 epoll_ctl
將需要監控的文件描述符(如 socket)注冊到 epoll
實例中,并指定感興趣的事件(如可讀、可寫)。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epfd
:epoll
實例的文件描述符。op
:操作類型,可以是EPOLL_CTL_ADD
(添加)、EPOLL_CTL_MOD
(修改)或EPOLL_CTL_DEL
(刪除)。fd
:要操作的文件描述符。event
:指向epoll_event
結構的指針,包含要監控的事件和附加數據。成功時返回
0
。失敗時返回
-1
,具體錯誤碼可以通過errno
獲取。
epoll_wait
使用 epoll_wait
等待 epoll
實例中的事件。epoll_wait
會阻塞當前線程,直到有文件描述符就緒或超時。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
epfd
:epoll
實例的文件描述符。events
:指向epoll_event
數組的指針,用于存儲就緒的事件。maxevents
:events
數組的最大容量,等同于插入就緒的文件描述符數量。timeout
:超時時間(單位為毫秒),-1
表示阻塞等待,0
表示非阻塞。返回值 > 0:表示有就緒的文件描述符,返回值為就緒的文件描述符數量。
返回值 == 0:表示超時,沒有任何文件描述符就緒。
返回值 < 0:表示發生錯誤,具體錯誤碼可以通過
errno
獲取。
示例:
int main(int argc, char const *argv[])
{int epfd = epoll_create1(0);ERROR_CHECK(epfd, -1, "epoll_create")int sockfd = socket(AF_INET, SOCK_STREAM, 0);ERROR_CHECK(sockfd, -1, "socket");struct epoll_event ev;ev.events = EPOLLIN; // 監聽可讀事件ev.data.fd = sockfd; // 存儲文件描述符int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);ERROR_CHECK(ret, -1, "epoll_ctl: add");struct epoll_event events[10];int nfds = epoll_wait(epfd, events, 10, -1);ERROR_CHECK(nfds, -1, "epoll_wait");char buf[1024];for (int i = 0; i < nfds; i++) {if (events[i].events & EPOLLIN) {int fd = events[i].data.fd; // 獲取文件描述符// 處理可讀事件read(fd, buf, sizeof(buf));}}return 0;
}
?實戰:使用epoll對于基于管道的簡易聊天程序進行改進
//客戶A
//客戶端A
#include<54func.h>
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf("waiting for connect\n");int fdr = open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, "open fdr error");int fdw = open(argv[2], O_RDWR);ERROR_CHECK(fdr, -1, "open fdw error");printf("connected\n");int epfd = epoll_create(1);ERROR_CHECK(epfd, -1, "epoll_create");struct epoll_event ev, evs[2];ev.events = EPOLLIN; ev.data.fd = STDIN_FILENO;int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev);ERROR_CHECK(ret, -1, "epoll_ctl: fdr");ev.data.fd = fdr; ret = epoll_ctl(epfd, EPOLL_CTL_ADD, fdr, &ev);ERROR_CHECK(ret, -1, "epoll_ctl: fdw");char buf[1024];int readyNum = 0;while (1){readyNum = epoll_wait(epfd, evs, 2, -1);for(int i = 0; i < readyNum; i++){if(evs[i].data.fd == STDIN_FILENO){memset(buf, 0, sizeof(buf));read(STDIN_FILENO, buf, sizeof(buf));write(fdw, buf, strlen(buf));}else if(evs[i].data.fd == fdr){memset(buf, 0, sizeof(buf));int sret = read(fdr, buf, sizeof(buf));if(sret == 0){printf("B is disconnected\n");return 0;}printf("B:%s",buf);}}}return 0;
}
//客戶B
//客戶端B
#include<54func.h>
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf("waiting for connect\n");int fdw = open(argv[1], O_RDWR);ERROR_CHECK(fdw, -1, "open fdw error");int fdr = open(argv[2], O_RDONLY);ERROR_CHECK(fdr, -1, "open fdr error");printf("connected\n");int epfd = epoll_create(1);ERROR_CHECK(epfd, -1, "epoll_create");struct epoll_event ev, evs[2];ev.events = EPOLLIN; ev.data.fd = STDIN_FILENO;int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev);ERROR_CHECK(ret, -1, "epoll_ctl: fdr");ev.data.fd = fdr; ret = epoll_ctl(epfd, EPOLL_CTL_ADD, fdr, &ev);ERROR_CHECK(ret, -1, "epoll_ctl: fdw");char buf[1024];int readyNum = 0;while (1){readyNum = epoll_wait(epfd, evs, 2, -1);for(int i = 0; i < readyNum; i++){if(evs[i].data.fd == STDIN_FILENO){memset(buf, 0, sizeof(buf));read(STDIN_FILENO, buf, sizeof(buf));write(fdw, buf, strlen(buf));}else if(evs[i].data.fd == fdr){memset(buf, 0, sizeof(buf));int sret = read(fdr, buf, sizeof(buf));if(sret == 0){printf("A is disconnected\n");return 0;}printf("A:%s",buf);}}}return 0;
}
輸出結果:
//客戶A
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./epollA 1.pipe 2.pipe
waiting for connect
connected
hello I am A
who are you?
B:I am B
Am I alone?
B:You are not alone.
Someboday tell me, Why it feel more real when I dream than truth?
B:There is some fiction in your truth, and some truth in your fiction.
B is disconnected
//客戶B
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./epollB 1.pipe 2.pipe
waiting for connect
connected
A:hello I am A
A:who are you?
I am B
A:Am I alone?
You are not alone.
A:Someboday tell me, Why it feel more real when I dream than truth?
There is some fiction in your truth, and some truth in your fiction.
^C
epoll的邊緣觸發
epoll_wait 的就緒觸發有兩種方式:一種是默認的水平觸發方式(Level-triggered),另一種是邊緣觸發模式(Edge-triggered)。以讀事件為例子:水平觸發模式下,只要緩沖區當中存在數據,就可以使 epoll_wait 就緒;在邊緣觸發的情況下,如果緩沖區中存在數據,但是數據一直沒有增多,那么 epoll_wait 就不會就緒,只有緩沖區的數據增多的時候,即下圖中綠色的上升沿部分時,才能使 epoll_wait 就緒。
使用水平觸發的話,線程能夠以更短的響應時間來處理事件,但是這可能會導致饑餓問題,如果存在某個事件傳輸的數據量過大,那么線的epoll_wait就會多次就緒直到處理完所有數據為止,而一些其他的任務所占用的資源就會相對變少而一直無法得到響應。使用邊緣觸發可以避免這個問題。為了確保讀操作可以將所有數據讀完,可以考慮使用循環配合非阻塞的形式來處理。
在線程池架構中,主線程通常會將實際的IO交給子線程即工作線程完成,采用邊緣觸發可以有效地降低主線程的響應頻率,提高整體的性能。除此以外,如果一次請求對應一次響應是用戶追求的通信模式,那么邊緣觸發正好符合。?
設置文件描述符為非阻塞模式
在邊緣觸發模式下,文件描述符必須設置為非阻塞模式(O_NONBLOCK
),否則可能會導致程序阻塞在 read
或 write
操作上。
我們需要使用fcntl設置文件的狀態
int flags = fcntl(sockfd, F_GETFL, 0);
ERROR_CHECK(flags, -1, "fcntl:get");int ret = fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
ERROR_CHECK(ret, -1, "fcntl: set");
設置邊緣觸發模式
在使用 epoll_ctl
注冊文件描述符時,通過在 events
字段中添加 EPOLLET
標志來啟用邊緣觸發模式。
int epfd = epoll_create(1);
ERROR_CHECK(epfd, -1, "epoll_create");
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 啟用邊緣觸發模式,監聽可讀事件
ev.data.fd = sockfd;
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev)
ERROR_CHECK(ret, -1, "epoll_ctl: add")
處理邊緣觸發事件
在邊緣觸發模式下,必須確保在每次通知后處理完所有數據。否則,如果數據沒有被完全讀取或寫入,可能會錯過后續的數據。
struct epoll_event events[10];
int nfds = epoll_wait(epfd, events, 10, -1);
ERROR_CHECK(nfds, -1, "epoll_wait");for (int i = 0; i < nfds; i++) {if (events[i].events & EPOLLIN) {char buf[1024];// 使用非阻塞讀取,確保讀取所有數據while (1) {memset(buf, 0, sizeof(buf));ssize_t ret = read(sockfd, buf, sizeof(buf));ERROR_CHECK(ret, -1, "read");if(ret == -1 || ret == 0){printf("finish\n");break;}printf("%s\n",buf);}}
}
處理所有數據
在邊緣觸發模式下,必須確保在每次通知后處理完所有數據。如果數據沒有被完全讀取或寫入,可能會錯過后續的數據。
?一次性觸發模式(EPOLLONESHOT)
如果需要在處理完事件后自動禁用該文件描述符的事件通知,可以結合 EPOLLONESHOT
標志使用。
ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT; // 啟用邊緣觸發模式和一次性觸發模式
ev.data.fd = sockfd;int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
ERROR_CHECK(ret, -1, "epoll_ctl");
使用水平觸發和邊緣觸發的效果區別:
我們先看一下對文件描述符使用阻塞和非阻塞的效果,設置邊緣觸發必須將文件描述符設置為非阻塞:
寫端:
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 2);int fdw = open(argv[1], O_RDWR);ERROR_CHECK(fdw, -1, "open");char buf[256];while (1){read(STDIN_FILENO, buf, sizeof(buf));write(fdw, buf, strlen(buf));}close(fdw);return 0;
}
?讀端為阻塞的情況:
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 2);int fdr = open(argv[1], O_RDONLY);char buf[20];while (1){memset(buf, 0, sizeof(buf));ssize_t sret = read(fdr, buf, sizeof(buf));printf("%ld %s\n", sret, buf);if(sret == 0) break;sleep(1);}close(fdr);return 0;
}
輸出結果:
//寫端
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./testETW 1.pipe
hello
what are you doing?
^C
//讀端
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./testET 1.pipe
6 hello
20 what are you doing?
0
可以看到,當緩沖區沒有數據時,讀端管道就會阻塞等待。只有當寫端關閉時,讀端才會變為非阻塞狀態,當寫端沒有數據時,讀端收到的是0進而退出。?
讀端為非阻塞的情況:
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 2);int fdr = open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, "open");int flags = fcntl(fdr, F_GETFL, 0);ERROR_CHECK(flags, -1, "fcntl:get");int ret = fcntl(fdr, F_SETFL, flags | O_NONBLOCK); //設置為非阻塞ERROR_CHECK(ret, -1, "fcntl:set");char buf[20];while (1){memset(buf, 0, sizeof(buf));ssize_t sret = read(fdr, buf, sizeof(buf));printf("%ld %s\n", sret, buf);if(sret == 0) break;sleep(1);}close(fdr);return 0;
}
輸出結果:
//寫端
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./testETW 1.pipe
hello
who?
^C
//讀端
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./testET 1.pipe
-1
-1
6 hello-1
-1
-1
-1
6 who?0
把讀端設置為非阻塞狀態時,不會因為沒有數據而等待。而是不斷去查詢,注意這里與阻塞狀態下是有區別的,當端沒有收到數據時,如果寫端未斷開,讀端會非阻塞的一直收到 sret = -1 ;當寫端斷開時,讀端才會收到 sret = 0。非阻塞模式允許程序在 I/O 操作無法完成時立即返回,從而可以快速處理其他任務,提高程序的響應速度。為什么在使用邊緣觸發時必須設置文件描述符為非阻塞模式這個問題我們先放一放,先看水平觸發和邊緣觸發的區別。
現在我們繼續觀察使用水平觸發和邊緣觸發讀取數據的區別:
為了使結果便于辨別,我們把接收緩沖區調小
使用水平觸發:
//讀端
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 2);printf("waiting for connect\n");int fdr = open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, "open fdr error");printf("connected\n");int epfd = epoll_create(1);ERROR_CHECK(epfd, -1, "epoll_create");struct epoll_event ev, evs[2];ev.data.fd = fdr; ev.events = EPOLLIN ;int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, fdr, &ev);ERROR_CHECK(ret, -1, "epoll_ctl: fdr");char buf[5];int readyNum = 0;while (1){readyNum = epoll_wait(epfd, evs, 2, -1);printf("epoll wait is ready\n");for(int i = 0; i < readyNum; i++){if(evs[i].data.fd == fdr){memset(buf, 0, sizeof(buf));int sret = read(fdr, buf, sizeof(buf));if(sret == -1){printf("B is disconnected\n");return 0;}printf("B:%s\n",buf);}}}return 0;
}//寫端
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf("waiting for connect\n");int fdw = open(argv[1], O_RDWR);ERROR_CHECK(fdw, -1, "open fdw error");printf("connected\n");int epfd = epoll_create(1);ERROR_CHECK(epfd, -1, "epoll_create");struct epoll_event ev, evs[2];ev.events = EPOLLIN; ev.data.fd = STDIN_FILENO;int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev);ERROR_CHECK(ret, -1, "epoll_ctl: fdw");char buf[5];int readyNum = 0;while (1){readyNum = epoll_wait(epfd, evs, 2, -1);for(int i = 0; i < readyNum; i++){if(evs[i].data.fd == STDIN_FILENO){memset(buf, 0, sizeof(buf));read(STDIN_FILENO, buf, sizeof(buf));write(fdw, buf, strlen(buf));}}}return 0;
}
輸出結果:
//客戶端A
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./ETA1 1.pipe
waiting for connect
connected
epoll wait is ready
B:hello
epoll wait is ready
B:, I a
epoll wait is ready
B:m B,
epoll wait is ready
B:what
epoll wait is ready
B:is yo
epoll wait is ready
B:ur na
epoll wait is ready
B:me?
//客戶端B
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./ETB 1.pipe 2.pipe
waiting for connect
connected
hello, I am B, what is your name?
可以觀察到,在使用水平觸發時,我們并未一次性讀取全部數據,而是部分讀取,只要讀端有數據,?epoll 就會多次就緒,直到把數據全部取出。
現在我們使用邊緣觸發看看效果:
//讀端
int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 2);printf("waiting for connect\n");int fdr = open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, "open fdr error");int flags = fcntl(fdr, F_GETFL, 0);ERROR_CHECK(flags, -1, "fcntl:get");int ret = fcntl(fdr, F_SETFL, flags | O_NONBLOCK);ERROR_CHECK(ret, -1, "fcntl:set");printf("connected\n");int epfd = epoll_create(1);ERROR_CHECK(epfd, -1, "epoll_create");struct epoll_event ev, evs[2];ev.data.fd = fdr; ev.events = EPOLLIN | EPOLLET;ret = epoll_ctl(epfd, EPOLL_CTL_ADD, fdr, &ev);ERROR_CHECK(ret, -1, "epoll_ctl: fdr");char buf[5];int readyNum = 0;while (1){readyNum = epoll_wait(epfd, evs, 2, -1);printf("epoll wait is ready\n");for(int i = 0; i < readyNum; i++){if(evs[i].data.fd == fdr){memset(buf, 0, sizeof(buf));int sret = read(fdr, buf, sizeof(buf));if(sret == 0){printf("B is disconnected\n");return 0;}printf("B:%s\n",buf);}}}return 0;
}
輸出結果:
//讀端
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./ETA1 1.pipe
waiting for connect
connected
epoll wait is ready
B:hello
epoll wait is ready
B:, I a
//寫端
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./ETB1 1.pipe
waiting for connect
connected
hello, I am B, what is your name?
can you hear me?
?可以看到,使用邊緣觸發時,無論讀端是否有數據,epoll 只會在每次收到數據時就緒一次。即使端沒有一次性讀取全部數據,也要等待下一次收到數據時才能再讀取數據。
那么如何在緩沖區一次無法接收全部數據時進行多次讀取呢?這個時候我們可以使用循環,效果如下:
while (1){readyNum = epoll_wait(epfd, evs, 2, -1);printf("epoll wait is ready\n");for(int i = 0; i < readyNum; i++){if(evs[i].data.fd == fdr){while(1){memset(buf, 0, sizeof(buf));int sret = read(fdr, buf, sizeof(buf));if(sret == 0 || sret == -1){printf("finish\n");break;}printf("B:%s\n",buf);}}}}
輸出結果:
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./ETA1 1.pipe
waiting for connect
connected
epoll wait is ready
B:hello
B:, I a
B:m B,
B:what
B:is yo
B:ur na
B:me?finish
這樣,我們就可以在邊緣觸發下一次性讀取全部數據。
以下是我在使用邊緣觸發讀取數據的幾點疑問:
為什么使用邊緣觸發時要把文件描述符設置為非阻塞的?
我們把文件描述符設置為阻塞模式看看效果:
int fdr = open(argv[1], O_RDONLY);
ERROR_CHECK(fdr, -1, "open fdr error");
int flags = fcntl(fdr, F_GETFL, 0);
ERROR_CHECK(flags, -1, "fcntl:get");
//int ret = fcntl(fdr, F_SETFL, flags | O_NONBLOCK);
//ERROR_CHECK(ret, -1, "fcntl:set");
printf("connected\n");
?輸出結果:
//讀端
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./ETA1 1.pipe
waiting for connect
connected
epoll wait is ready
B:hello, I am B, what
B:is your name?B:hello, I am B, what
B:is your name?
//寫端
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./ETB1 1.pipe
waiting for connect
connected
hello, I am B, what is your name?
hello, I am B, what is your name?
可以看到,在發送兩次消息時,epoll只就緒了一次,這明顯是有問題的 !原因如下:
當?read()
?讀取的數據量少于緩沖區大小時,程序無法區分是數據已讀完(需等待新事件)還是數據未讀完(需繼續讀)。若此時繼續調用阻塞的?read()
,它會一直等待新數據到來,導致線程阻塞。這導致其他文件描述符的事件得不到處理(饑餓),且當前描述符的后續事件可能丟失(因為狀態未再次變化)。
ET 模式要求程序在收到事件后必須一次性處理完所有數據(直到返回?EAGAIN
)。非阻塞模式的?read()
/write()
?在數據不足時會立即返回?EAGAIN
?或?EWOULDBLOCK
,程序可據此安全停止讀取。
阻塞模式下,若最后一次?read()
?時內核緩沖區數據恰好讀完,調用會阻塞線程,直到新數據到來。非阻塞模式確保?read()
?總是立即返回,避免線程意外阻塞。
結論:ET 必須非阻塞,LT 可容忍阻塞(但仍推薦非阻塞)。
什么時候時候可以用ET,什么時候可以用LT?
從之前的輸出結果,我們可以看到二者一個很明顯的區別就是,使用ET時可以極大的減少通知次數,減少?epoll_wait()
?的返回次數(僅在狀態變化時觸發),降低系統調用開銷。在有成千上萬連接的高并發情況下,減少通知次數可以有效緩解服務器處理請求的壓力。把后續處理任務交給線程自行處理,能夠更好的響應其他的連接,而不是一直把精力耗費在單個連接上。
由此,我們便很容易發現二者之間的區別和優勢:
邊緣觸發(ET)的優勢
更高的性能潛力:
????????減少?epoll_wait()
?的返回次數(僅在狀態變化時觸發),降低系統調用開銷。
????????適合高并發場景(如 >10k 連接),能顯著減少 CPU 占用。
避免重復事件風暴:
????????對高頻事件(如套接字持續可寫)更友好,不會因狀態未變化而重復通知。
更精細的控制:
強制要求程序一次性處理所有數據,避免邏輯分散。
水平觸發(LT)的優勢
編程簡單可靠:
????????允許分批處理數據(例如一次?read()
?部分數據),未處理完的事件會持續觸發。
????????不易遺漏事件,適合快速開發。
行為可預測:
????????與傳統?select
/poll
?行為一致,遷移成本低。
????????對異常情況(如未處理完數據)更寬容。
資源友好:
????????適合低頻或突發流量場景(如 HTTP 短連接),不會因單次未處理完而卡死。
其次,epoll出現的時間較晚,它生而就是為高并發而生的。最初只支持邊緣觸發,算是一個歷史遺留問題,所以在使用epoll時使用邊緣觸發更常見。
何時更適合使用邊緣觸發(ET)?
高性能服務器
????????需要處理 >10k 并發連接(如游戲服務器、交易所系統)。
????????例如:WebSocket 長連接服務,ET 能減少可寫事件的重復通知。
需避免事件風暴的場景
????????監聽大量持續可寫的套接字(如日志廣播服務),LT 會頻繁通知,而 ET 僅在緩沖區從滿變為非滿時通知一次。
精細控制數據吞吐
????????需要最大化單次 I/O 效率的場景(如文件傳輸服務),配合非阻塞 I/O 一次性讀寫完整數據塊。
延遲敏感型應用
????????金融交易系統等低延遲場景,ET 減少內核到用戶態的事件傳遞次數。
何時更適合水平觸發(LT)?
開發效率優先的應用
????????原型開發、內部工具等,LT 的簡單性可降低調試成本。
低頻 I/O 場景
????????命令行工具、低頻數據采集服務(如傳感器上報)。
需要兼容舊代碼
????????從?select
/poll
?遷移到?epoll
?時,LT 行為一致,兼容性更好。
對吞吐要求不極端
????????普通 Web 服務器(如 Nginx 默認使用 ET,但 Apache 可選 LT)。
超時處理
超時機制是IO多路復用中的一個重要功能,它允許程序在等待IO事件時設置一個時間限制,防止程序無限期地阻塞。實際應用中,程序可能需要在等待IO事件的同時執行其他任務,或者在超時后采取某種默認行為。
在網絡編程中,客戶端可能需要在一定時間內等待服務器響應,超時后重試或斷開連接。
在多任務環境中,程序可能需要在等待IO事件的同時處理其他任務。
一個典型的例子就是游戲中的掛機黨,當游戲服務器中存在大量的掛機玩家會嚴重占用資源,也會影響其他正常玩家的游戲體驗。使用超時機制可以在規定時間內清除無響應的玩家,使資源平衡到其他正常玩家中。
在設置IO多路復用的超時機制時,需要傳入一個時間結構體,用于設置超時時間,可以精確到微秒級別
struct timeval {time_t tv_sec; // 秒數suseconds_t tv_usec; // 微秒數(1秒 = 1,000,000 微秒)
};
selcet示例:
#include<54func.h>int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf("waiting for connect\n");int fdr = open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, "open fdr error");int fdw = open(argv[2], O_RDWR);ERROR_CHECK(fdr, -1, "open fdw error");printf("connected\n");char buf[1024];fd_set rdset;while (1){FD_ZERO(&rdset);FD_SET(STDIN_FILENO, &rdset);FD_SET(fdr, &rdset);struct timeval timeout;timeout.tv_sec = 3;timeout.tv_usec = 0;int ret = select(fdr+1, &rdset, NULL, NULL, &timeout);if(ret == 0){printf("timeout! disconnect\n");break;}if(FD_ISSET(STDIN_FILENO, &rdset)){memset(buf, 0, sizeof(buf));int ret = read(STDIN_FILENO, buf, sizeof(buf));ERROR_CHECK(ret, -1, "STDIN ERROR");write(fdw, buf, strlen(buf));}if(FD_ISSET(fdr, &rdset)){memset(buf, 0, sizeof(buf));int ret = read(fdr, buf, sizeof(buf));ERROR_CHECK(ret, -1, "STDIN ERROR");if(ret == 0){printf("B is disconnected\n");break;}printf("B:%s", buf);}}return 0;
}
ubuntu@ubuntu:~/MyProject/Linux/IO$ ./timeoutA 1.pipe
waiting for connect
connected
timeout! disconnect
對于有多個用戶同時進行連接時,僅向select里面放一個timeout,因為每收到一個消息,計時器就會重新計時,無法做到超時踢出的效果。這個時候我們可以改變一下思路,可以設置一個本地每秒鐘都會響應的計時器,并存儲上一次活動的時間。每當計時器響應時,就檢查當前時間與上一次的差值是否超過規定時間,超過就會自動下線。代碼實現如下:
#include<54func.h>int main(int argc, char const *argv[])
{ARGS_CHECK(argc, 3);printf("waiting for connect\n");int fdr = open(argv[1], O_RDONLY);ERROR_CHECK(fdr, -1, "open fdr error");int fdw = open(argv[2], O_RDWR);ERROR_CHECK(fdr, -1, "open fdw error");printf("connected\n");char buf[1024];fd_set rdset;time_t curtime = time(NULL);time_t lastactive = time(NULL);while (1){FD_ZERO(&rdset);FD_SET(STDIN_FILENO, &rdset);FD_SET(fdr, &rdset);struct timeval timeout;timeout.tv_sec = 1;timeout.tv_usec = 0;int ret = select(fdr+1, &rdset, NULL, NULL, &timeout);curtime = time(NULL);if(FD_ISSET(STDIN_FILENO, &rdset)){memset(buf, 0, sizeof(buf));int ret = read(STDIN_FILENO, buf, sizeof(buf));ERROR_CHECK(ret, -1, "STDIN ERROR");write(fdw, buf, strlen(buf));lastactive = time(NULL);}if(FD_ISSET(fdr, &rdset)){memset(buf, 0, sizeof(buf));int ret = read(fdr, buf, sizeof(buf));ERROR_CHECK(ret, -1, "STDIN ERROR");if(ret == 0){printf("B is disconnected\n");break;}printf("B:%s", buf);}if(curtime - lastactive > 3){printf("timeout! disconnect\n");break;}}return 0;
}