前言
作者:小蝸牛向前沖
名言:我可以接受失敗,但我不能接受放棄
??如果覺的博主的文章還不錯的話,還請
點贊,收藏,關注👀支持博主。如果發現有問題的地方歡迎?大家在評論區指正??
目錄
一、poll函數基礎知識
1、poll函數接口
2、poll函數多路轉接的實現
二、poll服務器的實現?
三、epoll函數的基礎知識
1、epoll的相關系統調用
2、epoll工作原理
四、epoll服務器?
本期學習:poll函數的相關接口,poll函數是如何實現多路轉接的,epoll函數的學習,epoll函數的工作原理,poll函數和epoll函數服務器的實現。
在學習poll和npoll之前,我們先來回顧一下select的特點:
1. select能同時等待的文件fd是有上限的,除非重新改內核,否則無法解決
2.必須借助第三方數組,來維護合法的fd?
3. select的大部分參數是輸入輸出型的,調用select前,要重新設置所有的fd,調用之后,我們還有檢查更新所有的fd.這帶來的就是遍歷的成本―--用戶
4. select為什么第一個參數是最大fd+1呢?確定遍歷范圍--內核層面
5. select采用位圖,用戶->內核,內核->用戶,來回的進行數據拷貝,拷貝成本的問題
更加詳細的回顧:傳送門?
為了解決select在IO時會fd上限和每次調用都要重新設定關心fd,所以我們的poll函數就上亮登場了。
一、poll函數基礎知識
1、poll函數接口
頭文件
#include <poll.h>
函數接口?
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
?參數說明
- fds是一個poll函數監聽的結構列表. 每一個元素中, 包含了三部分內容: 文件描述符, 監聽的事件集合, 返 回的事件集合.
- nfds表示fds數組的長度.?
- timeout表示poll函數的超時時間, 單位是毫秒(ms)
pollfd
結構體的定義:
struct pollfd {int fd; // 文件描述符short events; // 要監視的事件(輸入)short revents; // 實際發生的事件(輸出)
};
events
?字段是要監視的事件。它是一個位掩碼,可以包括以下常量之一或多個:?
OLLIN
:文件描述符可讀。POLLOUT
:文件描述符可寫。POLLPRI
:文件描述符有緊急數據可讀。POLLERR
:發生錯誤。POLLHUP
:文件描述符掛起(連接關閉)。POLLNVAL
:文件描述符不是一個打開的文件
revents
?字段是實際發生的事件。它是?poll()
?函數返回后被填充的字段,表示文件描述符上實際發生的事件。
返回結果
- 返回值小于0, 表示出錯;
- 返回值等于0, 表示poll函數等待超時;
- 返回值大于0, 表示poll由于監聽的文件描述符就緒而返回
2、poll函數多路轉接的實現
多路轉接的實現
- 使用
poll()
函數時,通常需要創建一個pollfd
數組,每個元素描述一個要監視的文件描述符及其關注的事件。- 然后,將該數組傳遞給
poll()
函數,并指定超時時間(或者設置為-1
表示永遠等待),poll()
函數會阻塞直到有事件發生或超時。- 返回后,程序可以檢查每個
pollfd
結構體的revents
字段來判斷每個文件描述符上實際發生的事件。
?poll的優點
不同與select使用三個位圖來表示三個fdset的方式,poll使用一個pollfd的指針實現.
- pollfd結構包含了要監視的event和發生的event,不再使用select“參數-值”傳遞的方式. 接口使用比 select更方便.
- poll并沒有最大數量限制 (但是數量過大后性能也是會下降).
poll的缺點?
poll中監聽的文件描述符數目增多時
- 和select函數一樣,poll返回后,需要輪詢pollfd來獲取就緒的描述符.
- 每次調用poll都需要把大量的pollfd結構從用戶態拷貝到內核中.
- 同時連接的大量客戶端在一時刻可能只有很少的處于就緒狀態, 因此隨著監視的描述符數量的增長, 其效率也會線性下降.
二、poll服務器的實現?
pollServer.hpp
#pragma once#include <iostream>
#include <string>
#include <functional>
#include <poll.h>
#include "sock.hpp"namespace poll_ns
{static const int defaultport = 8081;static const int num = 2048;static const int defaultfd = -1;using func_t = std::function<std::string(const std::string &)>;class PollServer{public:PollServer(func_t f, int port = defaultport) : _func(f), _port(port), _listensock(-1), _rfds(nullptr){}void ResetItem(int i){_rfds[i].fd = defaultfd;_rfds[i].events = 0;_rfds[i].revents = 0;}void initServer(){_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);_rfds = new struct pollfd[num];for (int i = 0; i < num; i++){ResetItem(i);}_rfds[0].fd = _listensock;_rfds[0].events = POLLIN;}void Print(){std::cout << "fd list: ";for (int i = 0; i < num; i++){if (_rfds[i].fd != defaultfd)std::cout << _rfds[i].fd << " ";}std::cout << std::endl;}void Accepter(int listensock){logMessage(DEBUG, "Accepter in");// select 告訴我, listensock讀事件就緒了std::string clientip;uint16_t clientport = 0;int sock = Sock::Accept(listensock, &clientip, &clientport);if (sock < 0)return;logMessage(NORMAL, "accept success [%s:%d]", clientip.c_str(), clientport);// sock我們能直接recv/read 嗎?不能,整個代碼,只有poll有資格檢測事件是否就緒// 將新的sock 托管給poll!// 將新的sock托管給poll本質,其實就是將sock,添加到fdarray數組中即可!int i = 0;// 找字符集中沒有被占用的位置for (; i < num; i++){if (_rfds[i].fd != defaultfd)continue;elsebreak;}if (i == num){logMessage(WARNING, "server if full, please wait");close(sock);}else{_rfds[i].fd = sock;_rfds[i].events = POLLIN;_rfds[i].revents = 0;}Print();logMessage(DEBUG, "Accepter out");}void Recver(int pos){logMessage(DEBUG, "in Recver");// 讀取requestchar buffer[1024];ssize_t s = recv(_rfds[pos].fd, buffer, sizeof(buffer) - 1, 0);if (s > 0){buffer[s] = 0;logMessage(NORMAL, "client# %s", buffer);}else if (s == 0){close(_rfds[pos].fd);ResetItem(pos);logMessage(NORMAL, "client quit");return;}else{close(_rfds[pos].fd);ResetItem(pos);logMessage(ERROR, "client quit: %s", strerror(errno));return;}// 2. 處理requeststd::string response = _func(buffer);// 3. 返回response// write bugwrite(_rfds[pos].fd, response.c_str(), response.size());logMessage(DEBUG, "out Recver");}// 1. handler event rfds 中,不僅僅是有一個fd是就緒的,可能存在多個// 2. 我們的poll目前只處理了read事件void HandlerReadEvent(){// 遍歷fdarray數組for (int i = 0; i < num; i++){// 過濾非法的fdif (_rfds[i].fd == defaultfd)continue;if (!(_rfds[i].events & POLLIN))continue;if (_rfds[i].fd == _listensock && (_rfds[i].revents & POLLIN))Accepter(_listensock);else if (_rfds[i].revents & POLLIN)Recver(i);else{}}}void start(){int timeout = -1;for (;;){int n = poll(_rfds, num, timeout);switch (n){case 0:logMessage(NORMAL, "timeout...");break;case -1:logMessage(WARNING, "poll error, code: %d, err string: %s", errno, strerror(errno));break;default:// 說明已經有事情就緒了logMessage(NORMAL, "have event ready!");HandlerReadEvent();break;}}}~PollServer(){if (_listensock < 0)close(_listensock);if (_rfds)delete[] _rfds;}private:int _port;int _listensock;struct pollfd *_rfds;func_t _func;};
}
為了解決pool的缺點,程序員們又設計出了npoll
三、epoll函數的基礎知識
按照man手冊的說法: 是為處理大批量句柄而作了改進的poll. 它是在2.5.44內核中被引進的(epoll(4) is a new API introduced in Linux kernel 2.5.44) 它幾乎具備了之前所說的一切優點,被公認為Linux2.6下性能最好的多路I/O就緒通知方法
1、epoll的相關系統調用
epoll 有3個相關的系統調用.
epoll_create?創建一個epoll的句柄(創建了epoll模型)
int epoll_create(int size);
- 自從linux2.6.8之后,size參數是被忽略的.
- 用完之后, 必須調用close()關閉
?epoll_ct? :epoll的事件注冊函數
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event)
- 它不同于select()是在監聽事件時告訴內核要監聽什么類型的事件, 而是在這里先注冊要監聽的事件類型.
- 第一個參數是epoll_create()的返回值(epoll的句柄).
- 第二個參數表示動作,用三個宏來表示.
- 第三個參數是需要監聽的fd.
- 第四個參數是告訴內核需要監聽什么事件
?第二個參數的取值:(增改刪)
- EPOLL_CTL_ADD :注冊新的fd到epfd中;
- EPOLL_CTL_MOD :修改已經注冊的fd的監聽事件;
- EPOLL_CTL_DEL :從epfd中刪除一個fd;
?struct epoll_event結構如下
typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64;
} epoll_data_t;struct epoll_event {uint32_t events; // 事件類型(輸入)epoll_data_t data; // 用戶數據(輸出)
};
poll_data_t
是一個聯合體,用于在 struct epoll_event
中傳遞用戶數據?
?events可以是以下幾個宏的集合:
- EPOLLIN : 表示對應的文件描述符可以讀 (包括對端SOCKET正常關閉);
- EPOLLOUT : 表示對應的文件描述符可以寫;
- EPOLLPRI : 表示對應的文件描述符有緊急的數據可讀 (這里應該表示有帶外數據到來); EPOLLERR : 表示對應的文件描述符發生錯誤;
- EPOLLHUP : 表示對應的文件描述符被掛斷;
- EPOLLET : 將EPOLL設為邊緣觸發(Edge Triggered)模式, 這是相對于水平觸發(Level Triggered)來說的.
- EPOLLONESHOT:只監聽一次事件, 當監聽完這次事件之后, 如果還需要繼續監聽這個socket的話, 需要 再次把這個socket加入到EPOLL隊列里.
epoll_wait:收集在epoll監控的事件中已經發送的事件
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
- ?參數events是分配好的epoll_event結構體數組.
- epoll將會把發生的事件賦值到events數組中 (events不可以是空指針,內核只負責把數據復制到這個 events數組中,不會去幫助我們在用戶態中分配內存).
- maxevents告之內核這個events有多大,這個 maxevents的值不能大于創建epoll_create()時的size.
- 參數timeout是超時時間 (毫秒,0會立即返回,-1是永久阻塞).
- 如果函數調用成功,返回對應I/O上已準備好的文件描述符數目,如返回0表示已超時,返回小于0表示函 數失敗
2、epoll工作原理
當某一進程調用epoll_create方法時,Linux內核會創建一個eventpoll結構體,這個結構體中有兩個成員與epoll的使用方式密切相關
- 每一個epoll對象都有一個獨立的eventpoll結構體,用于存放通過epoll_ctl方法向epoll對象中添加進來的事件
- 這些事件都會掛載在紅黑樹中,如此,重復添加的事件就可以通過紅黑樹而高效的識別出來(紅黑樹的插入時間效率是lgn,其中n為樹的高度)
- 而所有添加到epoll中的事件都會與設備(網卡)驅動程序建立回調關系,也就是說,當響應的事件發生時 會調用這個回調方法
- 這個回調方法在內核中叫ep_poll_callback,它會將發生的事件添加到rdlist雙鏈表中
struct eventpoll{ .... /*紅黑樹的根節點,這顆樹中存儲著所有添加到epoll中的需要監控的事件*/ struct rb_root rbr; /*雙鏈表中則存放著將要通過epoll_wait返回給用戶的滿足條件的事件*/ struct list_head rdlist; ....
}
在epoll中,對于每一個事件,都會建立一個epitem結構體.?
struct epitem{ struct rb_node rbn;//紅黑樹節點 struct list_head rdllink;//雙向鏈表節點 struct epoll_filefd ffd; //事件句柄信息 struct eventpoll *ep; //指向其所屬的eventpoll對象 struct epoll_event event; //期待發生的事件類型
}
- 當調用epoll_wait檢查是否有事件發生時,只需要檢查eventpoll對象中的rdlist雙鏈表中是否有epitem 元素即可.
- 如果rdlist不為空,則把發生的事件復制到用戶態,同時將事件數量返回給用戶. 這個操作的時間復雜度 是O(1).?
?總結一下, epoll的使用過程就是三部曲:
?
- 調用epoll_create創建一個epoll句柄;
- 調用epoll_ctl, 將要監控的文件描述符進行注冊;
- 調用epoll_wait, 等待文件描述符就緒
epoll的優點(和 select 的缺點對應)?
- 接口使用方便: 雖然拆分成了三個函數, 但是反而使用起來更方便高效. 不需要每次循環都設置關注的文件描述符, 也做到了輸入輸出參數分離開
- 數據拷貝輕量: 只在合適的時候調用 EPOLL_CTL_ADD 將文件描述符結構拷貝到內核中, 這個操作并不頻 繁(而select/poll都是每次循環都要進行拷貝)
- 事件回調機制: 避免使用遍歷, 而是使用回調函數的方式, 將就緒的文件描述符結構加入到就緒隊列中, epoll_wait 返回直接訪問就緒隊列就知道哪些文件描述符就緒. 這個操作時間復雜度O(1). 即使文件描述 符數目很多, 效率也不會受到影響.
- 沒有數量限制: 文件描述符數目無上限
epoll工作方式?
感性理解
你是一個網癮少年,?天天就喜歡呆在自己的房間玩游戲,你媽飯做好了, 喊你吃飯的時候有兩種方式:
- 1. 如果你媽喊你一次, 你沒動, 那么你媽會繼續喊你第二次, 第三次...,還有一種可能是,你吃了一口,又繼續去玩,你媽過一會又會開始喊你吃飯,直到你下來把飯吃完(?水平觸發)
- 2. 如果早上你媽喊你一次, 你沒動你媽就不管你了,到了下午又吃飯了,你媽又會叫你一次,沒來你媽也不管你,到了晚上...(邊緣觸發)
epoll有2種工作方式-水平觸發(LT)和邊緣觸發(ET)
水平觸發Level Triggered 工作模式
epoll默認狀態下就是LT工作模式:
- 當epoll檢測到socket上事件就緒的時候, 可以不立刻進行處理. 或者只處理一部分.
- 如上面的例子(你媽喊你吃飯類似), 由于只讀了1K數據, 緩沖區中還剩1K數據, 在第二次調用 epoll_wait 時, epoll_wait 仍然會立刻返回并通知socket讀事件就緒.
- 直到緩沖區上所有的數據都被處理完, epoll_wait 才會立刻返回.
- 支持阻塞讀寫和非阻塞讀?
簡單點來說只要底層數據沒有讀完就,epoll就會一直通知用戶要讀取數據LT
邊緣觸發Edge Triggered工作模式?
如果我們在第1步將socket添加到epoll描述符的時候使用了EPOLLET標志, epoll進入ET工作模式
- 當epoll檢測到socket上事件就緒時, 必須立刻處理.
- 如上面的例子(你媽喊你吃飯類似), 雖然只讀了1K的數據, 緩沖區還剩1K的數據, 在第二次調用 epoll_wait 的時候, epoll_wait 不會再返回了.
- 也就是說, ET模式下, 文件描述符上的事件就緒后, 只有一次處理機會.
- ET的性能比LT性能更高( epoll_wait 返回的次數少了很多). Nginx默認采用ET模式使用epoll.
- 只支持非阻塞的讀寫
ET就是底層數據沒有讀完,epoll也不會通知用戶在去讀取數據,除非底層數據變化的時候(增多),才會在通知用戶一次。?
對于ET模式的效率是非常高的,因為對于epoll在此模式下只有底層數據變化了才會通知用戶去讀數據,但是我們不知道數據讀完了,所以就會倒逼著用戶將本輪就緒的數據全部讀取到上層(循環讀取),所說,一般的fd是阻塞式的,但是在ET模式下的fd必須是非阻塞式的。
倒逼著用戶將本輪就緒的數據全部讀取到上層體現:
- 不僅僅在通知機制上,盡快讓上層把數據都讀走。
- 也讓T CP可以給對方發生提供了一個更大的窗口大小,讓對方更新出更大的滑動窗口
- 讓底層的數據發送效率更好,其中TCP6位標志位中的PUH提示接收端應用程序立刻從TCP緩沖區把數據讀走 。
?select和poll其實也是工作在LT模式下. epoll既可以支持LT, 也可以支持ET。
四、epoll服務器?
epollServer.hpp
#pragma once#include <iostream>
#include <string>
#include <cstring>
#include <functional>
#include <sys/epoll.h>
#include "err.hpp"
#include "log.hpp"
#include "sock.hpp"namespace epoll_ns
{static const int defaultport = 8888;static const int size = 128;static const int defaultvalue = -1;static const int defalultnum = 64;using func_t = std::function<std::string(const std::string &)>;class EpollServer{public:EpollServer(func_t f, uint16_t port = defaultport, int num = defalultnum): func_(f), _num(num), _revs(nullptr), _port(port), _listensock(defaultvalue), _epfd(defaultvalue){}void initServer(){_listensock = Sock::Socket();Sock::Bind(_listensock, _port);Sock::Listen(_listensock);// 1 創建epoll模型_epfd = epoll_create(size);if (_epfd < 0){logMessage(FATAL, "epoll create error: %s", strerror(errno));exit(EPOLL_CREATE_ERR);}// 2 添加listensocket到epoll中struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = _listensock;epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock, &ev);// 3 申請就緒事情的空間_revs = new struct epoll_event[_num];logMessage(NORMAL, "init server success!");}void HandlerEvent(int readyNum){logMessage(DEBUG, "HandlerEvent in");for (int i = 0; i < readyNum; i++){uint32_t events = _revs[i].events;int sock = _revs[i].data.fd;if (sock == _listensock && (events & EPOLLIN)){//_listensock讀數據就緒,獲取新連接std::string clinetip;uint16_t clinetport;int fd = Sock::Accept(sock, &clinetip, &clinetport);if (fd < 0){logMessage(WARNING, "accept error");continue;}// 獲取fd成功,放入到epoll中等struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = fd;epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);}else if (events & EPOLLIN){// 普通事情準備好了char buffer[1024];// 這里是有BUG的這里不能保證讀取是完整信息,這里先不解決int n = recv(sock, buffer, sizeof(buffer) - 1, 0);if (n > 0){buffer[n] = 0;logMessage(DEBUG, "client# %s", buffer);// 應答std::string respose = func_(buffer);send(sock, respose.c_str(), respose.size(), 0);}else if (n == 0){// 客戶端退出了epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(NORMAL, "client quit");}else{epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr);close(sock);logMessage(ERROR, "recv error, code: %d, errstring: %s", errno, strerror(errno));}}else{}}logMessage(DEBUG, "HandlerEvent out");}void start(){int timeout = -1;for (;;){int n = epoll_wait(_epfd, _revs, _num, timeout);switch (n){case 0:logMessage(NORMAL, "timeout...");case -1:logMessage(WARNING, "epoll_wait failed,code:%d,errstring: %s", errno, strerror(errno));// 到這里事件都就緒了default:logMessage(NORMAL, "have event ready");HandlerEvent(n);break;}}}~EpollServer(){if (_listensock != defaultvalue)close(_listensock);if (_epfd != defaultvalue)close(_epfd);if (_revs)delete[] _revs;}private:uint16_t _port;int _listensock;int _epfd;struct epoll_event *_revs;int _num;func_t func_;};
}
?在使用 telnet
命令連接到服務器后,你可以通過幾種方式來退出連接:
發送 Telnet 命令序列:
- 在 telnet 連接中,你可以發送一些特殊的 Telnet 命令來結束連接。
- 在連接中直接輸入?
Ctrl+]
,然后輸入?quit
?或者?exit
,然后按回車鍵。直接關閉 telnet 客戶端:
- 如果你不介意強制終止連接,可以直接關閉或者終止 telnet 客戶端。
- 在大多數系統中,你可以使用?
Ctrl+C
?或者?Ctrl+D
?來中斷當前運行的命令或程序。在這種情況下,這將關閉 telnet 客戶端并且終止連接。