文章目錄
- 一、五種IO模型
- 二、非阻塞IO
- 1.fcntl
- 2.實現函數SetNoBlock
- 3.輪詢方式讀取標準輸入
- 三、I/O多路轉接之select
- 1.初識select
- 2.select函數原型
- 3.socket就緒條件
- 4.select的特點
- 5.select缺點
- 6.select使用案例--只讀取數據的server服務器
- 1.err.hpp
- 2.log.hpp
- 3.sock.hpp
- 4.select_server.hpp
- 5.main.cc
一、五種IO模型
什么是IO?什么是高效的IO?
我們知道,我們調用read/recv的時候,如果緩沖區中沒有數據,那么就會阻塞住,有數據的時候,read/recv會進行拷貝,完成之后返回。所以 IO = 等 + 數據拷貝
那么如何讓IO更加的高效呢,只要減少等待的比重即可。
我們以幾個人釣魚為例來說明五種IO模型
張三:釣魚的時候一直盯著魚竿,不做任何其他事情
李四:釣魚的時候,偶爾看看書,看看手機
王五:在魚竿上方一個鈴鐺,鈴鐺響了就說明有魚,在鈴鐺沒有的時間內,王五也看看書,看看手機
趙六:同時投放了多個魚竿,來回進行檢測看哪個魚竿上有魚
田七和小王:田七是一個老板,想吃魚,就讓小王去釣,釣滿一桶之后,小王聯系田七,田七來取魚
對于上面的幾種釣魚方式,魚就是數據,河為內核空間,魚漂:魚就緒,數據就緒事件,魚竿:文件描述符
釣魚的動作:read/recv調用
張三是阻塞式IO,李四是非阻塞式IO,王五是信號驅動式IO,趙六是多路轉接/多路復用,田七是異步IO
張三李四王五在效率上沒有差別,但是李四和王五可以做其他的事情,張三李四王五趙六每個人都等了,每個人都參與了IO的過程,稱為同步IO。田七沒有參與IO的兩個階段的任何一個階段,稱為異步IO
在我們看來趙六的釣魚效率是最高的,因為他等的比重比較低,單位時間內釣魚的效率就高
為什么多路轉接/多路復用是高效的代名詞:IO = 等(減少等的比重) + 拷貝
阻塞IO是最常見的IO模型
非阻塞IO: 如果內核還未將數據準備好, 系統調用仍然會直接返回, 并且返回EWOULDBLOCK錯誤碼.
非阻塞IO往往需要程序員循環的方式反復嘗試讀寫文件描述符, 這個過程稱為輪詢. 這對CPU來說是較大的浪費, 一般只有特定場景下才使用
信號驅動IO: 內核將數據準備好的時候, 使用SIGIO信號通知應用程序進行IO操作
IO多路轉接: 雖然從流程圖上看起來和阻塞IO類似. 實際上最核心在于IO多路轉接能夠同時等待多個文件描述符的就緒狀態.
異步IO: 由內核在數據拷貝完成時, 通知應用程序(而信號驅動是告訴應用程序何時可以開始拷貝數據).
任何IO過程中, 都包含兩個步驟. 第一是等待, 第二是拷貝. 而且在實際的應用場景中, 等待消耗的時間往往都遠遠高于拷貝的時間. 讓IO更高效, 最核心的辦法就是讓等待的時間盡量少
同步通信vs異步通信(synchronous communication/ asynchronouscommunication)
同步和異步關注的是消息通信機制
所謂同步,就是在發出一個調用時,在沒有得到結果之前,該調用就不返回. 但是一旦調用返回,就得到返回值了; 換句話說,就是由調用者主動等待這個調用的結果;
異步則是相反,調用在發出之后,這個調用就直接返回了,所以沒有返回結果; 換句話說,當一個異步過程調用發出后,調用者不會立刻得到結果; 而是在調用發出后,被調用者通過狀態、通知來通知調用者,或通過回調函數處理這個調用.
另外, 我們回憶在講多進程多線程的時候, 也提到同步和互斥. 這里的同步通信和進程之間的同步是完全不想干的概念
進程/線程同步也是進程/線程之間直接的制約關系
是為完成某種任務而建立的兩個或多個線程,這個線程需要在某些位置上協調他們的工作次序而等待、傳遞信息所產生的制約關系. 尤其是在訪問臨界資源的時候
阻塞 vs 非阻塞
阻塞和非阻塞關注的是程序在等待調用結果(消息,返回值)時的狀態
阻塞調用是指調用結果返回之前,當前線程會被掛起. 調用線程只有在得到結果之后才會返回.
非阻塞調用指在不能立刻得到結果之前,該調用不會阻塞當前線程
其他高級IO
非阻塞IO,紀錄鎖,系統V流機制,I/O多路轉接(也叫I/O多路復用),readv和writev函數以及存儲映射IO(mmap),這些統稱為高級IO
二、非阻塞IO
1.fcntl
一個文件描述符, 默認都是阻塞IO.
函數原型如下
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );
傳入的cmd的值不同, 后面追加的參數也不相同.
fcntl函數有5種功能:
復制一個現有的描述符(cmd=F_DUPFD).
獲得/設置文件描述符標記(cmd=F_GETFD或F_SETFD).
獲得/設置文件狀態標記(cmd=F_GETFL或F_SETFL).
獲得/設置異步I/O所有權(cmd=F_GETOWN或F_SETOWN).
獲得/設置記錄鎖(cmd=F_GETLK,F_SETLK或F_SETLKW)
我們此處只是用第三種功能, 獲取/設置文件狀態標記, 就可以將一個文件描述符設置為非阻塞
2.實現函數SetNoBlock
基于fcntl, 我們實現一個SetNoBlock函數, 將文件描述符設置為非阻塞
void SetNoBlock(int fd)
{ int fl = fcntl(fd, F_GETFL); if (fl < 0){ perror("fcntl");return; }fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
使用F_GETFL將當前的文件描述符的屬性取出來(這是一個位圖).
然后再使用F_SETFL將文件描述符設置回去. 設置回去的同時, 加上一個O_NONBLOCK參數.
3.輪詢方式讀取標準輸入
#include <iostream>
#include <unistd.h>
#include <fcntl.h>void SetNonBlock(int fd)
{int fl = fcntl(fd, F_GETFL);if (fl < 0){perror("fcntl");return;}fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}int main()
{SetNonBlock(0);while (true){char buffer[1024];ssize_t s = read(0, buffer, sizeof(buffer) - 1);if (s > 0){std::cout << buffer << std::endl;}else{perror("read");sleep(1);continue;}}return 0;
}
三、I/O多路轉接之select
1.初識select
系統提供select函數來實現多路復用輸入/輸出模型.
select系統調用是用來讓我們的程序監視多個文件描述符的狀態變化的;
程序會停在select這里等待,直到被監視的文件描述符有一個或多個發生了狀態改變
select : IO = 等 + 拷貝
select 只負責等待,可以一次等待多個fd,select本身沒有數據拷貝的能力,拷貝要read,write來完成
2.select函數原型
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
參數解釋:
參數nfds是需要監視的最大的文件描述符值+1;
readfds,writefds,exceptfds分別對應于需要檢測的可讀文件描述符的集合,可寫文件描述符的集 合及異常文件描述符的集合;
參數timeout為結構timeval,用來設置select()的等待時間
fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout都是輸入輸出型參數
參數timeout取值:
NULL:則表示select()沒有timeout,select將一直被阻塞,直到某個文件描述符上發生了事件;
0:僅檢測描述符集合的狀態,然后立即返回,并不等待外部事件的發生。
特定的時間值:如果在指定的時間段里沒有事件發生,select將超時返回。
關于fd_set結構
其實這個結構就是一個整數數組, 更嚴格的說, 是一個 “位圖”. 使用位圖中對應的位來表示要監視的文件描述符.
提供了一組操作fd_set的接口, 來比較方便的操作位圖.
void FD_CLR(int fd, fd_set *set); // 用來清除描述詞組set中相關fd 的位
int FD_ISSET(int fd, fd_set *set); // 用來測試描述詞組set中相關fd 的位是否為真
void FD_SET(int fd, fd_set *set); // 用來設置描述詞組set中相關fd的位
void FD_ZERO(fd_set *set); // 用來清除描述詞組set的全部位
我們以讀事件為例來說明readfds,writefds,exceptfds
作為輸入時:表示用戶告訴內核,你要幫我關心一下,我給你的集合中的所有的fd的讀事件–哪些fd上的讀事件內核你要關心,比特位的位置,表示fd的數值,比特位的內容,表示是否關心
作為輸出時:內核告訴用戶,你所關心的多個fd中,有哪些已經就緒了。比特位的位置,表示fd的數值,比特位的內容,表示哪些fd上面的讀事件已經就緒了
這樣讓用戶和內核之間相互溝通,互相知曉對方要的或者關心的。
fd_set是一種類型,既然是一種類型,必定有大小,而且是固定的,所以能夠添加的fd的個數也是有上限的
關于timeval結構
timeval結構用于描述一段時間長度,如果在這個時間內,需要監視的描述符沒有事件發生則函數返回,返回值為0。
struct timaval
{time_t tv_sec; /* seconds */suseconds_t tv_usec; /* microseconds */
};傳輸參數:
nullptr : 阻塞式
struct timeval timeout = {0,0}; 非阻塞
struct timeval timeout = {5,0};
5s以內阻塞式,超過5s,非阻塞返回一次
函數返回值:
執行成功則返回文件描述詞狀態已改變的個數
如果返回0代表在描述詞狀態改變前已超過timeout時間,沒有返回
當有錯誤發生時則返回-1,錯誤原因存于errno,此時參數readfds,writefds, exceptfds和timeout的值變成不可預測。
ret > 0 : 有幾個fd就緒了
ret == 0 : 超時返回了
ret < 0 : select 調用失敗了
錯誤值可能為:
EBADF 文件描述詞為無效的或該文件已關閉
EINTR 此調用被信號所中斷
EINVAL 參數n 為負值。
ENOMEM 核心內存不足
3.socket就緒條件
讀就緒
socket內核中, 接收緩沖區中的字節數, 大于等于低水位標記SO_RCVLOWAT. 此時可以無阻塞的讀該文件描述符, 并且返回值大于0;
socket TCP通信中, 對端關閉連接, 此時對該socket讀, 則返回0;
監聽的socket上有新的連接請求;
socket上有未處理的錯誤;
寫就緒
socket內核中, 發送緩沖區中的可用字節數(發送緩沖區的空閑位置大小), 大于等于低水位標記
SO_SNDLOWAT, 此時可以無阻塞的寫, 并且返回值大于0;
socket的寫操作被關閉(close或者shutdown). 對一個寫操作被關閉的socket進行寫操作, 會觸發SIGPIPE信號;
socket使用非阻塞connect連接成功或失敗之后;
socket上有未讀取的錯誤;
異常就緒
socket上收到帶外數據. 關于帶外數據, 和TCP緊急模式相關(TCP協議頭中, 有一個緊急指針的字段),
4.select的特點
1.可監控的文件描述符個數取決與sizeof(fd_set)的值. 我這邊服務器上sizeof(fd_set)=512,每bit表示一個文件描述符,則我服務器上支持的最大文件描述符是512*8=4096。即select能同時等待的文件fd是有上限的,除非重新改變內核,否則無法解決。
2.將fd加入select監控集的同時,還要再使用一個數據結構array保存放到select監控集中的fd,
2.1一是用于再select 返回后,array作為源數據和fd_set進行FD_ISSET判斷。即必須借助第三方數組,來維護合法的fd
2.2二是select返回后會把以前加入的但并無事件發生的fd清空,則每次開始select前都要重新從array取得fd逐一加入(FD_ZERO最先),掃描array的同時取得fd最大值maxfd,用于select的第一個參數。select第一個參數為什么是最大fd+1呢?確定遍歷范圍 – 內核層面
3.select的大部分參數是輸入輸出型的,調用select前,要重新設置所有的fd,調用之后,我們還要檢查更新所有的fd,這就帶來遍歷成本
4.select 采用位圖,用戶 -> 內核,內核 -> 用戶,來回的進行數據拷貝,拷貝成本問題
備注: fd_set的大小可以調整,可能涉及到重新編譯內核
5.select缺點
每次調用select, 都需要手動設置fd集合, 從接口使用角度來說也非常不便.
每次調用select,都需要把fd集合從用戶態拷貝到內核態,這個開銷在fd很多時會很大
同時每次調用select都需要在內核遍歷傳遞進來的所有fd,這個開銷在fd很多時也很大
select支持的文件描述符數量太小,有上限
6.select使用案例–只讀取數據的server服務器
1.err.hpp
#pragma onceenum
{USAGE_ERR = 1,SOCKET_ERR,BIND_ERR,LISTEN_ERR
};
2.log.hpp
#pragma once#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <stdarg.h>#define NORMAL 0
#define DEBUG 1
#define WARNING 2
#define ERROR 3
#define FATAL 4#define LOG_NORMAL "./log.txt"
#define LOG_ERR "./err.txt"#define NUM 1024const char *to_levelstr(int level)
{switch (level){case DEBUG:return "DEBUG";case NORMAL:return "NORMAL";case WARNING:return "WARNING";case ERROR:return "ERROR";case FATAL:return "FATAL";default:return nullptr;}
}void LogMessage(int level, const char *format, ...)
{// [日志等級] [時間戳/時間] [pid] [messge]char logprofix[NUM];snprintf(logprofix, sizeof logprofix, "[%s][%ld][pid:%d]", to_levelstr(level), (long int)time(nullptr), getpid());char logcontent[NUM];va_list arg;va_start(arg, format);vsnprintf(logcontent, sizeof logcontent, format, arg);std::cout << logprofix << logcontent << std::endl;FILE *log = fopen(LOG_NORMAL, "a");FILE *error = fopen(LOG_ERR, "a");if (log && error){FILE *cur = nullptr;if (level == DEBUG || level == NORMAL || level == WARNING)cur = log;if (level == ERROR || level == FATAL)cur = error;if (cur)fprintf(cur, "%s%s\n", logprofix, logcontent);fclose(log);fclose(error);}
}
3.sock.hpp
#pragma once#include <iostream>
#include <cstring>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>#include "log.hpp"
#include "err.hpp"class Sock
{static const int backlog = 32;public:// 1. 創建socket文件套接字對象static int Socket(){int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock < 0){LogMessage(FATAL, "create socket error");exit(SOCKET_ERR);}LogMessage(NORMAL, "create socket success:%d", sock);// 允許地址重用,使得在套接字關閉后,該套接字所使用的地址可以立即被其他套接字使用int opt = 1;setsockopt(sock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof opt);return sock;}// 2.bind自己的網絡信息static void Bind(int sock, const uint16_t &port){struct sockaddr_in local;memset(&local, 0, sizeof local);local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;int n = bind(sock, (struct sockaddr *)&local, sizeof local);if (n < 0){LogMessage(FATAL, "socket bind error");exit(BIND_ERR);}LogMessage(NORMAL, "socket bind success");}// 3. 設置socket 為監聽狀態static void Listen(int sock){int n = listen(sock, backlog);if (n < 0){LogMessage(FATAL, "socket listen error");exit(LISTEN_ERR);}LogMessage(NORMAL, "socket listen success");}// 4.獲取連接static int Accept(int listensock, std::string *clientip, uint16_t *clientport){struct sockaddr_in peer;socklen_t len = sizeof(peer);int sock = accept(listensock, (struct sockaddr *)&peer, &len);if (sock < 0){LogMessage(ERROR, "socket accept error,next");}else{LogMessage(NORMAL, "accept a new link success, get new sock: %d", sock);*clientip = inet_ntoa(peer.sin_addr);*clientport = ntohs(peer.sin_port);}return sock;}
};
4.select_server.hpp
#pragma once#include <iostream>
#include <cstring>
#include <cstdlib>
#include <cerrno>
#include <string>
#include <functional>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/select.h>#include "sock.hpp"
#include "log.hpp"
#include "err.h"namespace select_ns
{const static int defaultport = 8080;const static int fdnum = sizeof(fd_set) * 8;const static int defaultfd = -1;typedef std::function<std::string(std::string)> func_t;class selectServer{public:selectServer(const func_t &func, const uint16_t &port = defaultport): _port(port), _func(func), _listensock(-1), _fdarray(nullptr){}void initServer(){_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);_fdarray = new int[fdnum];for (int i = 0; i < fdnum; i++)_fdarray[i] = defaultfd;_fdarray[0] = _listensock;}void Print(){std::cout << "fd list: ";for (int i = 0; i < fdnum; i++){if (_fdarray[i] != defaultfd){std::cout << _fdarray[i] << " ";}}}void Accepter(int listensock){std::string clientip;uint16_t clientport;int sock = Sock::Accept(listensock, &clientip, &clientport);if (sock < 0)return;LogMessage(NORMAL, "accept success[%s:%d]", clientip.c_str(), clientport);// sock我們能直接recv/read 嗎?不能,整個代碼,只有select有資格檢測事件是否就緒// 將新的sock 托管給select!// 將新的sock托管給select的本質,其實就是將sock,添加到fdarray數組中即可!int i = 0;for (; i < fdnum; i++){if (_fdarray[i] != defaultfd)continue;elsebreak;}if (i == fdnum){LogMessage(WARNING, "server is full,please wait");close(sock);}else_fdarray[i] = sock;Print();}void Recver(int sock, int pos){// 1. 讀取requestchar buffer[1024];ssize_t s = recv(sock, buffer, sizeof(buffer) - 1, 0);if (s > 0){buffer[s] = 0;LogMessage(NORMAL, "client#: %s", buffer);}else if (s == 0){close(sock);_fdarray[pos] = defaultfd;LogMessage(NORMAL, "client quit");return;}else{close(sock);_fdarray[pos] = defaultfd;LogMessage(ERROR, "client error:%s", strerror(errno));return;}// 2. 處理requeststd::string response = _func(buffer);// 3. 返回responsewrite(sock, response.c_str(), response.size());}void HandlerEvent(fd_set &rfds){for (int i = 0; i < fdnum; i++){if (_fdarray[i] == defaultfd)continue;if (FD_ISSET(_fdarray[i], &rfds) && _fdarray[i] == _listensock)Accepter(_listensock);else if (FD_ISSET(_fdarray[i], &rfds))Recver(_fdarray[i], i);else{}}}void start(){for (;;){fd_set rfds;FD_ZERO(&rfds);int maxfd = _fdarray[0];for (int i = 0; i < fdnum; i++){if (_fdarray[i] == defaultfd)continue;FD_SET(_fdarray[i], &rfds); // 合法 fd 全部添加到讀文件描述符集中if (maxfd < _fdarray[i])maxfd = _fdarray[i]; // 更新所有fd中最大的fd}int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);switch (n){case 0:LogMessage(NORMAL, "timeout...");break;case -1:LogMessage(WARNING, "select error,code: %d, err string: %s", errno, strerror(errno));break;default:// 說明有事件就緒了,目前只有一個監聽事件就緒了LogMessage(NORMAL, "have event ready!");HandlerEvent(rfds);break;}}}~selectServer(){if (_listensock > 0)close(_listensock);if (_fdarray)delete[] _fdarray;}private:int _port;int _listensock;int *_fdarray;func_t _func;};
}
5.main.cc
#include "selectServer.hpp"
#include "err.hpp"
#include <memory>using namespace std;
using namespace select_ns;static void Usage(const string proc)
{std::cerr << "Usage:\n\t" << proc << " port\n\n";
}string transaction(const string &request)
{return request;
}int main(int argc, char *argv[])
{if (argc != 2){Usage(argv[0]);exit(USAGE_ERR);}// std::unique_ptr<selectServer> svr(new selectServer(transaction,atoi(argv[0])));std::unique_ptr<selectServer> svr(new selectServer(transaction));svr->initServer();svr->start();return 0;
}