前言
作者:小蝸牛向前沖
名言:我可以接受失敗,但我不能接受放棄
??如果覺的博主的文章還不錯的話,還請
點贊,收藏,關注👀支持博主。如果發現有問題的地方歡迎?大家在評論區指正?
目錄
一、五種IO模型
1、什么是IO
2、感性的理解五種IO模型
3、理解五種IO模型
4、高級IO重要概念
?二、I/O多路轉接之select
1、select的基本概念和接口介紹
2、對select的理解?
三、select服務器的編寫
1、err.hpp和log.hpp
2、makefile和main.cc?
3、?selectServer.hpp和sock.hpp
4、測試?
?本期學習:IO五層模型的理解,select的接口常識及其多路轉接的理解,編寫select服務器。
一、五種IO模型
1、什么是IO
"IO" 通常指的是輸入/輸出(Input/Output)。
- 在計算機科學和編程中,輸入/輸出是指程序與外部世界、外部設備或其他程序之間進行數據交換的過程。
- 這些外部設備可以包括磁盤驅動器、網絡連接、鍵盤、鼠標、顯示器等。輸入是指程序接收來自外部環境的數據,輸出是指程序將數據發送到外部環境。
?IO的操作
- 從文件中讀取數據、向文件寫入數據、從網絡接收數據、向網絡發送數據,以及與硬件設備進行交互等。
- IO 操作通常是相對較慢的,因為它們涉及到與外部設備或網絡通信,而這些通信可能涉及到物理設備的限制或網絡延遲。?
本文主要討論文件上的IO。
我們在文件上寫入或者是讀取數據,在系統層面上就是調用read/recv這些函數借口,前面我們也談論過調用這些函數的本質其實在拷貝數據。
對于read/recv無非存在二種情況:
- 沒有數據,就會進行阻塞等待。
- 有數據就會進行拷貝,完成后返回。
這也就說明IO的本質是拷貝+等待?
那我們如何做到高效IO呢?
本質上我們只要減少等待的時間就可以。
下面我們通過一個故事感性的理解五種IO模型。
2、感性的理解五種IO模型
有這么幾個人,他們非常喜歡釣魚。
1號張三用一根釣魚竿釣魚,他喜歡一直盯這魚竿看魚有沒有上鉤。
2號李四也是一根釣魚竿釣魚,但是他就比較休閑,他是每隔一定時間看一下魚竿動了沒,沒動就去做別的事情。
3號王五也是一根釣魚竿釣魚,但他就比較有意思,他在魚竿上寄了一個鈴鐺,要是魚竿動了他就拉桿看有沒魚,沒聲音響就一直忙自己的時候。
4號趙六他覺的用一根魚竿釣魚的效率太慢了,于是就弄了一排魚竿,來會的在這一排魚竿旁邊走,看那個魚竿動了就拉起來。
5號小王他是個大老板,他喜歡吃這里釣的魚吃,自己時間又忙,于是他就讓他的屬下田七來這里釣魚。
在上面故事中的釣魚其實就分為等+釣。
那上面誰釣魚的效率高呢?我們知道等的比重越低,單位時間內釣魚的越高。
?那肯定是趙六的效率是最高的,因為他等的比例是最低的。
在程序員看來我們可以認為:
魚就是數據,魚塘就是內核空間,魚竿發生動作魚就緒是數據就緒的事情,魚竿我們就認為是文件描述符,釣魚的動作:recv/read系統接口的調用。
?五號任務就代表五種IO模型:
張三----------->阻塞式IO
李四----------->非阻塞式IO
王五----------->信號驅動式IO
趙六----------->多路轉接/多路復用
田七----------->異步IO(2這里的老板趙六相當魚操作系統,田七相當進程/線程)
3、理解五種IO模型
阻塞IO是最常見的IO模型
在阻塞 I/O 中,當應用程序發起一個 I/O 操作(比如讀取文件或者從網絡接收數據),程序會被阻塞(暫停執行),直到操作完成并且數據準備好被應用程序處理。
?
非阻塞IO?
非阻塞IO: 如果內核還未將數據準備好, 系統調用仍然會直接返回, 并且返回EWOULDBLOCK錯誤碼?
非阻塞IO往往需要程序員循環的方式反復嘗試讀寫文件描述符, 這個過程稱為輪詢. 這對CPU來說是較大的浪費, 一 般只有特定場景下才使用.?
??信號驅動IO
?信號驅動IO: 內核將數據準備好的時候, 使用SIGIO信號通知應用程序進行IO操作
IO多路轉接: ?
IO多路轉接: 雖然從流程圖上看起來和阻塞IO類似. 實際上最核心在于IO多路轉接能夠同時等待多個文件 描述符的就緒狀態?
??
?小結
- 任何IO過程中, 都包含兩個步驟. 第一是等待, 第二是拷貝.
- 而且在實際的應用場景中, 等待消耗的時間往 往都遠遠高于拷貝的時間. 讓IO更高效, 最核心的辦法就是讓等待的時間盡量少
4、高級IO重要概念
同步通信 vs 異步通信
- 所謂同步,就是在發出一個調用時,在沒有得到結果之前,該調用就不返回. 但是一旦調用返回,就得到返回值了; 換句話說,就是由調用者主動等待這個調用的結果;
- 異步則是相反,調用在發出之后,這個調用就直接返回了,所以沒有返回結果; 換句話說,當一個異步 過程調用發出后,調用者不會立刻得到結果; 而是在調用發出后,被調用者通過狀態、通知來通知調用 者,或通過回調函數處理這個調用
另外, 我們回憶在講多進程多線程的時候, 也提到同步和互斥. 這里的同步通信和進程之間的同步是完全不想干的概 念.?
- 進程/線程同步也是進程/線程之間直接的制約關系
- 是為完成某種任務而建立的兩個或多個線程,這個線程需要在某些位置上協調他們的工作次序而等待、 傳遞信息所產生的制約關系. 尤其是在訪問臨界資源的時候
?同學們以后在看到 "同步" 這個詞, 一定要先搞清楚大背景是什么. 這個同步, 是同步通信異步通信的同步, 還是進程同步與互斥的同步
阻塞 vs 非阻塞
阻塞和非阻塞關注的是程序在等待調用結果(消息,返回值)時的狀態
- 阻塞調用是指調用結果返回之前,當前線程會被掛起. 調用線程只有在得到結果之后才會返回.
- 非阻塞調用指在不能立刻得到結果之前,該調用不會阻塞當前線程
?二、I/O多路轉接之select
1、select的基本概念和接口介紹
這里我們先一起達成一個公識:IO的本質=等+拷貝。
select是一個系統調用只負責等,可以等待多個fd,select本身沒有數據拷貝的能力,拷貝還是要read,write來完成。
系統提供select函數來實現多路復用輸入/輸出模型.
- select系統調用是用來讓我們的程序監視多個文件描述符的狀態變化的;
- 程序會停在select這里等待,直到被監視的文件描述符有一個或多個發生了狀態改變;
select函數原型
包含的頭文件
#include <sys/time.h>#include <sys/types.h>#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);
參數說明?
nfds
:監視的文件描述符中最大的文件描述符值加一。readfds
:指向一個?fd_set
?結構的指針,用于指定一組待檢查是否可讀的文件描述符。writefds
:指向一個?fd_set
?結構的指針,用于指定一組待檢查是否可寫的文件描述符。exceptfds
:指向一個?fd_set
?結構的指針,用于指定一組待檢查是否異常的文件描述符。timeout
:指向?struct timeval
?結構的指針,用于設置?select()
?調用的超時時間,如果為?NULL
?則表示不設置超時,會一直阻塞直到有文件描述符就緒或者被信號中斷。
參數timeout取值
- NULL:則表示select()沒有timeout,select將一直被阻塞,直到某個文件描述符上發生了事件;
- 0:非阻塞僅檢測描述符集合的狀態,然后立即返回,并不等待外部事件的發生。
- 特定的時間值:如果在指定的時間段里沒有事件發生,select將超時返回。
- struct timeval timeout{5,0}:表示5秒以內阻塞,超過5秒,非阻塞返回一次。
返回值?
- ret>0?告訴系統育多少個fd就緒
- ret==0調用超時,返回
- ret<0調用失敗
fd_set:位圖結構,表示文件描述符的集合?
其實這個結構就是一個整數數組, 更嚴格的說, 是一個 "位圖". 使用位圖中對應的位來表示要監視的文件描述符
?提供了一組操作fd_set的接口, 來比較方便的操作位圖.
- void FD_CLR(int fd, fd_set *set); // 用來清除描述詞組set中相關fd 的位
- int FD_ISSET(int fd, fd_set *set); // 用來測試描述詞組set中相關fd 的位是否為真
- void FD_SET(int fd, fd_set *set); // 用來設置描述詞組set中相關fd的位
- void FD_ZERO(fd_set *set); // 用來清除描述詞組set的全部位
關于timeval結構?
timeval結構用于描述一段時間長度,如果在這個時間內,需要監視的描述符沒有事件發生則函數返回,返回值為 0
函數返回值:?
- 執行成功則返回文件描述詞狀態已改變的個數
- ?如果返回0代表在描述詞狀態改變前已超過timeout時間,沒有返回
- 當有錯誤發生時則返回-1,錯誤原因存于errno,此時參數readfds,writefds, exceptfds和timeout的 值變成不可預測。
錯誤值可能為:
- EBADF (ebadf)文件描述詞為無效的或該文件已關閉
- EINTR(eintr) 此調用被信號所中斷
- EINVAL(einval)?參數n 為負值。
- ENOMEM(enomem) 核心內存不足
2、對select的理解?
理解select模型的關鍵在于理解fd_set,為說明方便,取fd_set長度為1字節,fd_set中的每一bit可以對應一個文件描 述符fd。則1字節長的fd_set最大可以對應8個fd
- (1)執行fd_set set; FD_ZERO(&set);則set用位表示是0000,0000。
- (2)若fd=5,執行FD_SET(fd,&set); 后set變為0001,0000(第5位置為1)?
- (3)若再加入fd=2,fd=1,則set變為0001,0011 。
- (4)執行 select(6,&set,0,0,0)阻塞等待 。
- (5)若fd=1,fd=2上都發生可讀事件,則select返回,此時set變為 0000,0011。注意:沒有事件發生的fd=5被清空。
?socket就緒條件
讀就緒
- socket內核中, 接收緩沖區中的字節數, 大于等于低水位標記SO_RCVLOWAT. 此時可以無阻塞的讀該文件描述符, 并且返回值大于0;
- socket TCP通信中, 對端關閉連接, 此時對該socket讀, 則返回0;
- 監聽的socket上有新的連接請求;
- socket上有未處理的錯誤;
寫就緒?
- socket內核中, 發送緩沖區中的可用字節數(發送緩沖區的空閑位置大小), 大于等于低水位標記 SO_SNDLOWAT, 此時可以無阻塞的寫, 并且返回值大于0;
- socket的寫操作被關閉(close或者shutdown). 對一個寫操作被關閉的socket進行寫操作, 會觸發SIGPIPE 信號;
- socket使用非阻塞connect連接成功或失敗之后;
- socket上有未讀取的錯誤;
select的特點?
- 可監控的文件描述符個數取決與sizeof(fd_set)的值. 我這邊服務器上sizeof(fd_set)=512,每bit表示一個文件 描述符,則我服務器上支持的最大文件描述符是512*8=4096. 將fd加入select監控集的同時,還要再使用一個數據結構array保存放到select監控集中的fd, 一是用于再select 返回后,array作為源數據和fd_set進行FD_ISSET判斷。
- 二是select返回后會把以前加入的但并無事件發生的fd清空,則每次開始select前都要重新從array取得 fd逐一加入(FD_ZERO最先),掃描array的同時取得fd最大值maxfd,用于select的第一個參數。
select缺點?
- 每次調用select, 都需要手動設置fd集合, 從接口使用角度來說也非常不便.
- 每次調用select,都需要把fd集合從用戶態拷貝到內核態,這個開銷在fd很多時會很大 同時每次調用select都需要在內核遍歷傳遞進來的所有fd,這個開銷在fd很多時也很大
- select支持的文件描述符數量太小
?select是如何實現多路轉接的
準備監視的文件描述符集合:程序通過向內核傳遞一個文件描述符集合,告訴內核它希望監視哪些文件描述符的狀態變化。
調用
select
系統調用:程序調用select
系統調用,并將準備好進行 I/O 操作的文件描述符集合傳遞給內核。內核監視文件描述符狀態變化:內核開始監視這些文件描述符的狀態變化。如果其中任何一個文件描述符的狀態發生變化(例如,變為可讀、可寫或出現異常),內核將返回給程序。
程序處理返回結果:程序從
select
返回的結果中獲取到哪些文件描述符準備好進行 I/O 操作,然后針對這些文件描述符執行相應的 I/O 操作。通常,程序會使用read
、write
等系統調用來實際進行 I/O 操作。
?select
的實現通常使用輪詢技術,內核會遍歷程序提供的所有文件描述符,檢查它們的狀態是否發生變化。這種方式雖然簡單,但效率較低,尤其在文件描述符數量較多時會導致性能下降。
上面我們理解select進行多路轉接的原理,下面我們自己寫一個select多路轉接的服務器加深理解。?
三、select服務器的編寫
1、err.hpp和log.hpp
err.hpp
#pragma once#include <iostream>enum
{USAGE_ERR = 1, // usage_errSOCKET_ERR, // socket_errBIND_ERR, // bind_errLISTEN_ERR // listen_err
};
log.hpp
#pragma once#include <iostream>
#include <string>
#include <cstdarg>
#include <ctime>
#include <unistd.h>// debug
#define DEBUG 0
// normal
#define NORMAL 1
// warning
#define WARNING 2
// error
#define ERROR 3
// fatal
#define FATAL 4const char *to_levelstr(int level)
{switch (level){case DEBUG:return "DEBUG";case NORMAL:return "NORMAL";case WARNING:return "WARNING";case ERROR:return "ERROR";case FATAL:return "FATAL";default:return nullptr;}
}void logMessage(int level, const char *format, ...)
{
#define NUM 1024char logprefix[NUM];snprintf(logprefix, sizeof(logprefix), "[%s][%ld][pid: %d]",to_levelstr(level), (long int)time(nullptr), getpid());char logcontent[NUM];va_list arg;va_start(arg, format);vsnprintf(logcontent, sizeof(logcontent), format, arg);std::cout << logprefix << logcontent << std::endl;
}
2、makefile和main.cc?
makefile
select_server: main.ccg++ -o $@ $^ -std=c++11
.PHONY:clean
clean:rm -f select_server
main.cc??
#include "selectServer.hpp"
#include "err.hpp"
#include <memory>using namespace std;
using namespace select_ns;static void usage(std::string proc)
{std::cerr << "Usage:\n\t" << proc << " port"<< "\n\n";
}std::string transaction(const std::string &request)
{return request;
}// ./select_server 8081
int main(int argc, char *argv[])
{// if(argc != 2)// {// usage(argv[0]);// exit(USAGE_ERR);// }// unique_ptr<SelectServer> svr(new SelectServer(atoi(argv[1])));// std::cout << "test: " << sizeof(fd_set) * 8 << std::endl;unique_ptr<SelectServer> svr(new SelectServer(transaction));svr->initServer();svr->start();return 0;
}
3、?selectServer.hpp和sock.hpp
?selectServer.hpp
#pragma once#include <iostream>
#include <string>
#include <functional>
#include "sock.hpp"namespace select_ns
{static const int defaultport = 8081;static const int fdnum = sizeof(fd_set) * 8;static const int defaultfd = -1;using func_t = std::function<std::string(const std::string &)>;class SelectServer{public:SelectServer(func_t f, int port = defaultport) : func(f), _port(port), _listensock(-1), fdarray(nullptr){}void initServer(){_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);fdarray = new int[fdnum];for (int i = 0; i < fdnum; i++){fdarray[i] = defaultfd;}fdarray[0] = _listensock;}void Print(){std::cout << "fd list: ";for (int i = 0; i < fdnum; i++){if (fdarray[i] != defaultfd)std::cout << fdarray[i] << " ";}std::cout << std::endl;}void Accepter(int listensock){logMessage(DEBUG, "Accepter in");// select 告訴我, listensock讀事件就緒了std::string clientip;uint16_t clientport = 0;int sock = Sock::Accept(listensock, &clientip, &clientport);if (sock < 0)return;logMessage(NORMAL, "accept success [%s:%d]", clientip.c_str(), clientport);// sock我們能直接recv/read 嗎?不能,整個代碼,只有select有資格檢測事件是否就緒// 將新的sock 托管給select!// 將新的sock托管給select的本質,其實就是將sock,添加到fdarray數組中即可!int i = 0;// 找fdarray字符集中沒有被占用的位置for (; i < fdnum; i++){if (fdarray[i] != defaultfd)continue;elsebreak;}if (i == fdnum){logMessage(WARNING, "server if full, please wait");close(sock);}else{fdarray[i] = sock;}Print();logMessage(DEBUG, "Accepter out");}void Recver(int sock, int pos){logMessage(DEBUG, "in Recver");// 1. 讀取request// 這樣讀取是有問題的!char buffer[1024];ssize_t s = recv(sock, buffer, sizeof(buffer) - 1, 0);if (s > 0){buffer[s] = 0;logMessage(NORMAL, "client# %s", buffer);}else if (s == 0){close(sock);fdarray[pos] = defaultfd;logMessage(NORMAL, "client quit");return;}else{close(sock);fdarray[pos] = defaultfd;logMessage(ERROR, "client quit: %s", strerror(errno));return;}// 2、處理requeststd::string response = func(buffer);// 3、返回responsewrite(sock, response.c_str(), response.size());logMessage(DEBUG, "out Recver");}// 1. handler event rfds 中,不僅僅是有一個fd是就緒的,可能存在多個// 2. 我們的select目前只處理了read事件void HandlerReadEvent(fd_set &rfds){// 遍歷fdarray數組for (int i = 0; i < fdnum; i++){// 過濾掉非法的fdif (fdarray[i] == defaultfd)continue;// 正常的fd,不一定就緒了if (FD_ISSET(fdarray[i], &rfds) && fdarray[i] == _listensock)Accepter(_listensock);else if (FD_ISSET(fdarray[i], &rfds))Recver(fdarray[i], i);else{}}}void start(){for (;;){fd_set rfds;FD_ZERO(&rfds);int maxfd = fdarray[0];for (int i = 0; i < fdnum; i++){if (fdarray[i] == defaultfd)continue;FD_SET(fdarray[i], &rfds); // 將合法的fd全部添加到讀文件描述符中// 更新最大的maxfdif (maxfd < fdarray[i])maxfd = fdarray[i];}logMessage(NORMAL, "max fd is: %d", maxfd);int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);switch (n){case 0:logMessage(NORMAL, "timeout...");break;case -1:logMessage(WARNING, "select error, code: %d, err string: %s", errno, strerror(errno));break;default:// 說明有事件就緒了,目前只有一個監聽事件就緒了logMessage(NORMAL, "have event ready!");HandlerReadEvent(rfds);// HandlerWriteEvent(wfds);break;}}}~SelectServer(){if (_listensock < 0)close(_listensock);if (fdarray)delete fdarray;}private:int _port;int _listensock;int *fdarray;func_t func;};
}
sock.hpp?
#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"
#include "err.hpp"class Sock
{const static int backlog = 32; // sokc listen的數量public:static int Socket(){// 1創建套接字// int sock = socket(AF_FILE, SOCK_STREAM, 0); // af_file,sock_stream errrorint sock = socket(AF_INET, SOCK_STREAM, 0);//af_inetif (sock < 0){logMessage(FATAL, "create socket error");exit(SOCKET_ERR);}logMessage(NORMAL, "create socket success: %d", sock);int opt = 1;setsockopt(sock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); // sol_socket,so_reuseaddr,so_reuseport//服務器重啟后可快速復用地址和端口return sock;}static void Bind(int sock, int port){// 2bind綁定網絡信息struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET; // afinetlocal.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY; // inaddr_anyif (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0){logMessage(FATAL, "bind socket error");exit(BIND_ERR);}logMessage(NORMAL, "bind socket success");}static void Listen(int sock){// 3設置sock為監聽if (listen(sock, backlog) < 0){logMessage(FATAL, "listen socket error");exit(LISTEN_ERR);}logMessage(NORMAL, "listen socket success");}static int Accept(int listensock, std::string *clientip, uint16_t *clientport){struct sockaddr_in peer;socklen_t len = sizeof(peer);int sock = accept(listensock, (struct sockaddr *)&peer, &len);if (sock < 0)logMessage(ERROR, "accept error, next");else{logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // ?*clientip = inet_ntoa(peer.sin_addr);*clientport = ntohs(peer.sin_port);}return sock;}
};
4、測試?
運行服務器
./select_server
客戶端連接?
telnet 127.0.0.1 8081