什么是高效的IO?
正常情況下,IO=等+拷貝
高效的IO=拷貝(即讓IO盡量不等)
為什么我們平常玩電腦的時候,感覺不到等待的過程呢?
任何通信場景,IO通信場景,效率一定是有上限的. 花盆里,長不出參天大樹。也就是說任何通信場景下的IO都有等待的過程,只是因為我們的本地主機硬件彼此之間都離得很近,等待的過程很短,所以我們感覺不到,你換做網絡通信,通信雙方離千里之外,你就很能感覺到這個等待的過程了。
如何提高IO的效率?
單位時間內,等待的比重越低,IO 效率越高!
提高IO的效率,其實就是要提高拷貝操作在IO操作中的比重,減少等待的比重!
五種IO模型
五種IO模型分別是哪五種?他們有啥區別?
- 阻塞式IO
- 非阻塞IO輪詢
- 信號驅動式IO
- 多路轉接/多路復用IO
- 異步IO
我們可以通過下面的例子去理解:
假如說在你老家有一個池塘,然后有很多人喜歡去這個池塘里面釣魚。
在一個風和日麗的下午,有倆人在池邊釣魚,一個叫張三,一個叫李四。這哥倆的釣魚裝備都一樣,但是他們釣魚的方式有些許差別:張三在魚上鉤之前一直在觀察魚竿有沒有動靜,如果發現沒有動靜,那他也不干別的事情,就接著等,一直等到有動靜為止。而李四并不是一直在觀察魚竿有沒有動靜,他只是定時的過來查看一下,如果檢測到魚竿沒有動靜。他就會去做別的事情,然后過一段時間再來看看魚竿有沒有動靜。
過了一會兒又來了一個人叫王五,他走到池塘邊放桿之后,在魚竿的頭部系了一個鈴鐺,然后把桿子往那兒一放,自己低頭玩手機。每當鈴鐺響的時候,王五就起來收桿,收完桿之后把桿子一放,繼續玩手機。
又過了一會兒,來了一個人叫趙六,這個人很有錢。他沿著池塘的邊兒放了100根魚竿兒,自己就圍著池塘來回巡邏,巡邏的過程中一看見哪個魚竿有魚上鉤了,他趕緊就去收。
最后一個趕來的人叫田七,這個人是個大老板,平時事務非常繁忙,但是就喜歡釣魚,來到這里之后還沒釣一會兒來,這時候突然公司里有急事兒,釣不了魚了。他就吩咐他的司機小王說,小王你給我釣一下。今天下午你釣完10條魚以后,給我打個電話。我過來開始接你回去,完不成任務,你就一直擱這給我調。
在上面的例子中,你如果將釣魚這件事情理解成IO。將釣魚的人理解成計算機中的進程,那些魚竿兒理解成Io的目標文件(文件描述符)。就可以比較好的理解。五種不同的io方式之間的區別。其實我們可以看到這5個人的做法應該是一個比一個高效的,因為他們從上往下等待的時間越來越少。釣魚過程中自己騰出來的時間越來越多
問題1:阻塞式IO 與 非阻塞式IO的區別在哪里?(張三和李四的做法有什么區別?)
張三是在魚上鉤之前一直在觀察魚竿有沒有動靜,如果發現沒有動靜,那他就接著等,一直等到有動靜為止。而李四并不是一直在觀察魚竿有沒有動靜,他只是定時的過來查看一下,如果檢測到魚竿沒有動靜。他就會去做別的事情,然后過一段時間再來看看魚竿有沒有動靜。
這倆人的做法的核心區別在于,當他們檢測到魚竿沒有動靜的時候,他們的處理方式是不一樣的,張三的處理方式是沒有等到我接著等。而李四的處理方式是沒有等到,我就去干別的事兒,過一會兒我再來看。
阻塞式IO與非阻塞式IO的核心區別也在于此,阻塞式IO如果沒等到,就會一直在等,而非阻塞式IO如果沒等到,他不會一直等,而是回去干別的事情,過一段時間之后再來檢測。
但是值得注意的是,張三和李四在魚沒有上鉤之前干了什么,魚一旦上鉤,他們干的事情都是一樣的,也就是說——阻塞式IO 與 非阻塞式IO 拷貝操作的效率沒有任何區別!
問題2:如何理解 非阻塞式IO 的效率比 阻塞式IO 要高 ?
我們經常會聽到一種說法叫做:非阻塞式IO 的效率比 阻塞式IO 要高。這個效率應該如何理解?是不是意味著相同的時間內李四釣的魚比張三釣的魚多呢?
其實并不是,這兩個人釣到魚的多少取決于池塘里邊的魚咬誰的鉤咬的多。但是在池塘里的魚看來,我又不知道張三和李四在我沒咬鉤的時候在干什么?我看到的水底下的兩個鉤子是差不多的。那我咬他們兩個鉤的概率就是一樣的。那按照這個道理。同樣的時間內李四釣到的魚應該和張三釣的魚一樣多。
也就是說對于同一個IO任務來說,計算機無論是采用 非阻塞式IO 的策略,還是 阻塞式IO 的策略,可能處理這一個任務的時間都是相同的
既然如此,那為什么我們還說非阻塞式IO 的效率比 阻塞式IO 要高呢?我們可以從下面兩種角度去理解。
(1)計算機除了要處理io,還要進行很多其他的操作。在相同的時間內,計算機采用非阻塞式IO 的方案進行處理,完成的總工作量要比采用阻塞式IO要多。
李是在下午釣魚的這段時間內。不僅釣上魚他還看了很多小說,刷了很多視頻。是張三在這個下午完成的工作僅僅是釣了這么多魚。因此李四完成的總任務比張還要多,因此我們說李四的效率比張三高。
(2)采用非阻塞式IO ,在沒有等到IO事件之前。計算機可以去做別的事情,這個別的事情也可以是其他類型的IO,這樣采用非阻塞式IO,計算機處理的全部IO工作量就比阻塞式IO多了
用我們的例子去理解,“非阻塞IO效率高” 他的意思是—— 李四可以在同樣的時間,做更多的其他事情!
問題3:上面的五種IO,誰的效率是最高的? (這一下午這5個人誰釣的魚最多?)
趙六!!下午這5個人全部加入之后,池塘里邊一共有104根兒魚竿兒。其中100根魚竿兒都是趙六的!除了趙六之外,其他的所有人釣魚都只用一根魚竿。我們假設這池子里面的魚咬每一根魚竿兒的概率是一樣的,那傻子都知道這一下午肯定是趙六釣的魚是最多的。
那么在計算機中,多路轉接/多路復用IO的效率就是最高的,也就是說在相同的時間內,采用這種方式處理的IO工作量是最多的。
趙六, 任意一個魚竿(fd), 魚(數據)就緒的概率很高 IO = 等+拷貝
一個人, 檢測多個魚竿, 降低了等的比重
問題3:信號驅動式IO最大的特點是什么?效率咋樣?(王五的做法和前面倆人的做法最本質的區別在哪里?)
張三就是一直在主動的檢測這個魚有沒有上鉤。李四雖然經常去干別的事情,但他也會定期去看看這個魚竿。有沒有動靜,他心里至少還掛念著這個魚竿。但是王五他是真一點兒都不掛念,全程低頭玩手機,如果不是有這個鈴鐺叫,他是絕對不會主動抬頭的。
我們前面講IO的工作分成等待和拷貝兩個部分。
在信號驅動式IO中,拷貝工作的開始是由信號觸發的,也就是說你不給我發信號,我永遠都不會開始拷貝。
問題4:這五種IO方式中,最后一種叫做異步IO,那有沒有同步IO呢?同步IO的定義是啥?信號產生不是異步的嘛?為什么信號驅動式IO屬于同步IO呢?
我們說的五種IO方式中,前面四種都是同步IO。
結合我們前面舉的例子,張三李四王五趙六這4個人。他們雖然在魚上鉤之前,等待的方式不一樣。但是當魚上鉤的時候,他們都會親手握住魚竿兒把魚拉上來。
對應在計算機中,雖然前四種IO方式的策略不同,但是當等待的IO事件發生的時候,都是由等待這個IO的進程親自去處理這個IO。而異步io指的是?我專門兒創建一個進程去處理這個IO,我后邊兒就一點兒都不問了。當IO完成的時候,讓那個進程自動把我要的東西給我。
問題5:同步IO和異步IO的區別是什么?(田七和前面四個釣魚佬的區別是什么?)
結合我們剛剛說的例子。那4個人你別管他們等待的方式是什么的,但是他們都是在那魚塘邊兒待了一下午,都是親自拉桿兒把魚釣上來的。而田七調到一半他就跑了,就去當甩手掌柜去了。這個就很像我們現實生活中黑心煤礦的老板。你別看他的公司干的是煤礦,但是這個公司的中高層有百分之八九十都沒下過礦,他們都不知道這個煤是怎么挖的,他們也不知道煤礦有多危險,他們只知道工人冒著生命危險提取出來的煤礦,可以賣掉賺大錢。領導就是只負責給下屬提要求,他告訴下屬,我不管你們是怎么實現的,反正我只負責驗收,你們看著辦吧。
我們前面說,IO=等+拷貝
只要你參與了IO的過程(可能你只參與了等的過程,可能你只參與了拷貝的過程,這都算參與IO的過程),就是同步IO。像煤老板那種連煤礦都沒下過的,全程不參與IO過程的就是異步IO。
五種IO模型的實現方式
我們以UDP通信中,用戶調用recvfrom接收數據的場景為例,說明五種IO模型的不同實現方式
阻塞式IO
這個最簡單,只需要正常調用recvfrom就行了(因為創建套接字時默認就是阻塞方式),阻塞式IO的基本過程如下
下面的通過socket網絡編程實現簡單阻塞式IO的代碼
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>int main() {// 1. 創建 UDP Socket(默認即為阻塞模式)int sockfd = socket(AF_INET, SOCK_DGRAM, 0);if (sockfd < 0) {perror("socket 創建失敗");return -1;}// 2. 綁定地址和端口(作為服務器必須綁定)struct sockaddr_in local_addr;memset(&local_addr, 0, sizeof(local_addr));local_addr.sin_family = AF_INET; // IPv4local_addr.sin_addr.s_addr = INADDR_ANY; // 監聽所有網卡local_addr.sin_port = htons(8888); // 端口 8888if (bind(sockfd, (struct sockaddr*)&local_addr, sizeof(local_addr)) < 0) {perror("綁定端口失敗");close(sockfd);return -1;}printf("服務器啟動,等待數據(阻塞模式)...\n");// 3. 阻塞式接收數據(關鍵:未收到數據會一直卡在這里)char buf[1024] = {0};struct sockaddr_in client_addr; // 存儲客戶端地址socklen_t client_len = sizeof(client_addr);// 調用 recvfrom:若沒有數據,進程會進入休眠狀態(阻塞)ssize_t recv_len = recvfrom(sockfd, buf, sizeof(buf) - 1, // 留一個位置給字符串結束符0, (struct sockaddr*)&client_addr, &client_len);if (recv_len < 0) {perror("接收數據失敗");close(sockfd);return -1;}// 輸出收到的數據和客戶端信息buf[recv_len] = '\0'; // 手動添加字符串結束符printf("收到來自 %s:%d 的數據:%s\n",inet_ntoa(client_addr.sin_addr), // 客戶端 IPntohs(client_addr.sin_port), // 客戶端端口buf); // 數據內容close(sockfd);return 0;
}
非阻塞式IO
非阻塞IO的處理過程
具體實現原理也很簡單,只需要在創建套接字時將其設置為非阻塞模式即可
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>int main() {// 1. 創建 UDP Socketint sockfd = socket(AF_INET, SOCK_DGRAM, 0);if (sockfd < 0) {perror("socket failed");return -1;}// 2. 設置為非阻塞模式int flags = fcntl(sockfd, F_GETFL, 0);fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);// 3. 綁定地址(可選,若作為服務端需要綁定)struct sockaddr_in local_addr;memset(&local_addr, 0, sizeof(local_addr));local_addr.sin_family = AF_INET;local_addr.sin_addr.s_addr = INADDR_ANY;local_addr.sin_port = htons(8888);if (bind(sockfd, (struct sockaddr*)&local_addr, sizeof(local_addr)) < 0) {perror("bind failed");close(sockfd);return -1;}// 4. 模擬非阻塞 recvfromchar buf[1024] = {0};struct sockaddr_in peer_addr;socklen_t peer_len = sizeof(peer_addr);// 首次調用:數據未準備好時直接返回錯誤ssize_t ret = recvfrom(sockfd, buf, sizeof(buf), 0, (struct sockaddr*)&peer_addr, &peer_len);if (ret < 0) {// 非阻塞特有錯誤碼:EWOULDBLOCK/EAGAINif (errno == EWOULDBLOCK || errno == EAGAIN) {printf("數據未準備好,非阻塞直接返回\n");} else {perror("recvfrom error");}}// 5. 模擬「重試」或結合多路復用(如 select/poll/epoll)// 這里簡化為休眠 2 秒,假設期間有數據到達sleep(2);// 再次調用 recvfrom(假設此時數據已準備好)ret = recvfrom(sockfd, buf, sizeof(buf), 0, (struct sockaddr*)&peer_addr, &peer_len);if (ret > 0) {printf("收到數據:%s (來自 %s:%d)\n", buf, inet_ntoa(peer_addr.sin_addr), ntohs(peer_addr.sin_port));} else {perror("再次 recvfrom 失敗");}close(sockfd);return 0;
}
信號驅動式IO
下面是信號驅動式IO的流程圖
他具體實現起來的思想也很簡單,由于在這種方式中IO事件是靠信號遞達的,我們就在信號處理函數handle中調用recvfrom進行數據拷貝就行
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>int sockfd; // 全局套接字描述符,供信號處理函數使用
struct sockaddr_in client_addr;
socklen_t client_len;// 信號處理函數:當內核通知IO事件就緒時被調用
void sigio_handler(int signo) {char buf[1024] = {0};ssize_t recv_len;// 讀取數據(此時數據已就緒,不會阻塞)recv_len = recvfrom(sockfd, buf, sizeof(buf)-1, 0,(struct sockaddr*)&client_addr, &client_len);if (recv_len < 0) {perror("recvfrom failed");return;}buf[recv_len] = '\0';printf("收到來自 %s:%d 的數據: %s\n",inet_ntoa(client_addr.sin_addr),ntohs(client_addr.sin_port),buf);// 簡單回復客戶端const char* reply = "已收到數據";sendto(sockfd, reply, strlen(reply), 0,(struct sockaddr*)&client_addr, client_len);
}int main() {struct sockaddr_in server_addr;struct sigaction sa;// 1. 創建UDP套接字sockfd = socket(AF_INET, SOCK_DGRAM, 0);if (sockfd < 0) {perror("socket creation failed");exit(EXIT_FAILURE);}// 2. 綁定服務器地址和端口memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY;server_addr.sin_port = htons(8888);if (bind(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {perror("bind failed");close(sockfd);exit(EXIT_FAILURE);}// 3. 設置信號處理函數(捕獲SIGIO信號)sa.sa_handler = sigio_handler;sigemptyset(&sa.sa_mask);sa.sa_flags = 0;if (sigaction(SIGIO, &sa, NULL) < 0) {perror("sigaction failed");close(sockfd);exit(EXIT_FAILURE);}// 4. 設置套接字屬主,讓內核知道該向哪個進程發送SIGIO信號if (fcntl(sockfd, F_SETOWN, getpid()) < 0) {perror("fcntl F_SETOWN failed");close(sockfd);exit(EXIT_FAILURE);}// 5. 啟用信號驅動式IO(設置O_ASYNC標志)int flags = fcntl(sockfd, F_GETFL, 0);if (fcntl(sockfd, F_SETFL, flags | O_ASYNC) < 0) {perror("fcntl F_SETFL O_ASYNC failed");close(sockfd);exit(EXIT_FAILURE);}printf("信號驅動式IO服務器啟動,端口 8888...\n");printf("等待數據中(主線程可執行其他任務)...\n");// 6. 主線程可以執行其他任務,無需阻塞等待IOwhile (1) {// 模擬主線程處理其他業務sleep(1);// printf("主線程正在執行其他任務...\n");}close(sockfd);return 0;
}
IO多路轉接
操作系統給我們提供了專門的接口用于實現IO多路轉接,我們只需要學會如何使用就行了
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <fcntl.h>#define MAX_EVENTS 100
#define BUFFER_SIZE 1024// 設置套接字為非阻塞模式
void set_nonblocking(int fd) {int flags = fcntl(fd, F_GETFL, 0);if (flags == -1) {perror("fcntl F_GETFL");exit(EXIT_FAILURE);}if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {perror("fcntl F_SETFL");exit(EXIT_FAILURE);}
}int main() {// 創建監聽套接字int listen_fd = socket(AF_INET, SOCK_STREAM, 0);if (listen_fd == -1) {perror("socket");exit(EXIT_FAILURE);}// 綁定地址和端口struct sockaddr_in server_addr;server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY;server_addr.sin_port = htons(8888);if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {perror("bind");close(listen_fd);exit(EXIT_FAILURE);}// 開始監聽if (listen(listen_fd, 5) == -1) {perror("listen");close(listen_fd);exit(EXIT_FAILURE);}// 創建 epoll 實例int epoll_fd = epoll_create1(0);if (epoll_fd == -1) {perror("epoll_create1");close(listen_fd);exit(EXIT_FAILURE);}// 注冊監聽套接字到 epollstruct epoll_event event;event.events = EPOLLIN;event.data.fd = listen_fd;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event) == -1) {perror("epoll_ctl add listen_fd");close(listen_fd);close(epoll_fd);exit(EXIT_FAILURE);}struct epoll_event events[MAX_EVENTS];while (1) {// 等待事件發生,最多等待 MAX_EVENTS 個事件int num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);if (num_events == -1) {perror("epoll_wait");break;}for (int i = 0; i < num_events; i++) {if (events[i].data.fd == listen_fd) {// 有新的客戶端連接請求struct sockaddr_in client_addr;socklen_t client_addr_len = sizeof(client_addr);int client_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &client_addr_len);if (client_fd == -1) {perror("accept");continue;}// 設置客戶端套接字為非阻塞模式set_nonblocking(client_fd);// 注冊客戶端套接字到 epollevent.events = EPOLLIN;event.data.fd = client_fd;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &event) == -1) {perror("epoll_ctl add client_fd");close(client_fd);}printf("新客戶端連接: %s:%d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));} else if (events[i].events & EPOLLIN) {// 客戶端有數據可讀int client_fd = events[i].data.fd;char buffer[BUFFER_SIZE];ssize_t bytes_read = recv(client_fd, buffer, sizeof(buffer) - 1, 0);if (bytes_read == -1) {perror("recv");close(client_fd);epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, NULL);} else if (bytes_read == 0) {// 客戶端關閉連接printf("客戶端斷開連接: %d\n", client_fd);close(client_fd);epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, NULL);} else {buffer[bytes_read] = '\0';printf("收到客戶端 %d 數據: %s\n", client_fd, buffer);// 簡單回顯數據給客戶端if (send(client_fd, buffer, bytes_read, 0) == -1) {perror("send");close(client_fd);epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, NULL);}}}}}close(listen_fd);close(epoll_fd);return 0;
}
異步IO
簡單來說就是,用戶進程將這個IO的任務交給內核,內核把數據拷貝完成之后,再通知應用程序(在信號驅動式IO中,內核通過信號告知應用程序何時可以開始拷貝數據,拷貝數據這活還是得用戶進程自己來)
實現代碼
#include <iostream>
#include <fcntl.h>
#include <aio.h>
#include <signal.h>
#include <unistd.h>
#include <cstring>
#include <errno.h>// 緩沖區大小
#define BUFFER_SIZE 1024// 異步IO操作的控制塊
struct aiocb aio_cb;// 信號處理函數:當異步IO完成時被調用
void aio_completion_handler(int signo, siginfo_t* info, void* context) {if (info->si_signo == SIGIO) {// 檢查異步操作是否成功完成if (aio_error(&aio_cb) == 0) {// 獲取實際讀取的字節數ssize_t bytes_read = aio_return(&aio_cb);if (bytes_read > 0) {std::cout << "異步讀取完成,讀取了 " << bytes_read << " 字節: " << std::endl;std::cout << static_cast<char*>(aio_cb.aio_buf) << std::endl;} else if (bytes_read == 0) {std::cout << "已到達文件末尾" << std::endl;}} else {std::cerr << "異步讀取失敗: " << strerror(aio_error(&aio_cb)) << std::endl;}}
}int main(int argc, char* argv[]) {if (argc != 2) {std::cerr << "用法: " << argv[0] << " <文件名>" << std::endl;return 1;}const char* filename = argv[1];// 1. 打開文件(同步操作)int fd = open(filename, O_RDONLY);if (fd == -1) {std::cerr << "打開文件失敗: " << strerror(errno) << std::endl;return 1;}// 2. 初始化異步IO控制塊memset(&aio_cb, 0, sizeof(struct aiocb));// 分配緩沖區char* buffer = new char[BUFFER_SIZE];aio_cb.aio_buf = buffer;aio_cb.aio_nbytes = BUFFER_SIZE - 1; // 留一個字節給終止符aio_cb.aio_fildes = fd; // 文件描述符aio_cb.aio_offset = 0; // 讀取起始位置// 3. 設置信號處理:當異步IO完成時接收SIGIO信號struct sigaction sa;memset(&sa, 0, sizeof(struct sigaction));sa.sa_sigaction = aio_completion_handler; // 信號處理函數sa.sa_flags = SA_SIGINFO; // 使用sigaction風格的處理函數if (sigaction(SIGIO, &sa, NULL) == -1) {std::cerr << "設置信號處理失敗: " << strerror(errno) << std::endl;close(fd);delete[] buffer;return 1;}// 4. 設置文件描述符的所有者,讓內核知道向哪個進程發送信號if (fcntl(fd, F_SETOWN, getpid()) == -1) {std::cerr << "設置文件所有者失敗: " << strerror(errno) << std::endl;close(fd);delete[] buffer;return 1;}// 5. 啟動異步讀取操作if (aio_read(&aio_cb) == -1) {std::cerr << "啟動異步讀取失敗: " << strerror(errno) << std::endl;close(fd);delete[] buffer;return 1;}std::cout << "異步讀取已啟動,主線程可以執行其他任務..." << std::endl;// 6. 主線程執行其他任務(模擬)for (int i = 0; i < 5; ++i) {std::cout << "主線程正在執行任務 " << i + 1 << std::endl;sleep(1); // 模擬耗時操作}// 7. 清理資源close(fd);delete[] buffer;return 0;
}