目錄
一.接口
1.1epoll_creaet?
1.2epoll_ctl
1.3epoll_wait
?二.細節問題
2.1 工作原理
2.2 epoll的demo
2.3 epoll的優點
三. LT 與 ET模式
理解ET
四. reactor
一.接口
1.1epoll_creaet?
注意返回值是一個文件描述符?
?創建一個epoll模型
1.2epoll_ctl
返回值:?
?
第一個參數是epoll_create的返回值
第二個參數表示動作,用三個宏來表示.
第三個參數是需要監聽的 fd.
第四個參數是告訴內核需要監聽什么事.
第二個參數的取值:
? EPOLL_CTL_ADD:注冊新的 fd 到 epfd 中;
? EPOLL_CTL_MOD:修改已經注冊的 fd 的監聽事件;
? EPOLL_CTL_DEL:從 epfd 中刪除一個 fd;??
第四個參數:
? EPOLLIN : 表示對應的文件描述符可以讀 (包括對端 SOCKET 正常關閉);
? EPOLLOUT : 表示對應的文件描述符可以寫;
? EPOLLPRI : 表示對應的文件描述符有緊急的數據可讀 (這里應該表示有帶外 數據到來);
? EPOLLERR : 表示對應的文件描述符發生錯誤;
? EPOLLHUP : 表示對應的文件描述符被掛斷;
? EPOLLET : 將 EPOLL 設為邊緣觸發(Edge Triggered)模式, 這是相對于水平 觸發(Level Triggered)來說的.
? EPOLLONESHOT:只監聽一次事件, 當監聽完這次事件之后, 如果還需要繼 續監聽這個 socket 的話, 需要再次把這個 socket 加入到 EPOLL 隊列里
1.3epoll_wait
? epoll 將會把發生的事件賦值到 events 數組中 (events 不可以是空指針,內核 只負責把數據復制到這個 events 數組中,不會去幫助我們在用戶態中分配內存).
? maxevents 告之內核這個 events 有多大,這個 maxevents 的值不能大于創建 epoll_create()時的 size.
? 參數 timeout 是超時時間 (毫秒,0 會立即返回,-1 是永久阻塞).
? 如果函數調用成功,返回對應 I/O 上已準備好的文件描述符數目,如返回 0 表 示已超時, 返回小于 0 表示函數失敗.?
?二.細節問題
epoll模型實際上就是三種東西,紅黑樹,就緒隊列,回調機制。epoll_ctl實際上就是維護紅黑樹的,用戶告訴內核,要求內核幫我去關心哪些fd。epoll_wait就是內核告訴用戶,哪一個fd上面的某些事情已經就緒了。
2.1 工作原理
細節:epoll_ctl的作用:向紅黑樹中插入節點,向底層回調注冊回調方法。
當某一進程調用 epoll_create 方法時,Linux 內核會創建一個 eventpoll 結構 體,這個結構體中有兩個成員與 epoll 的使用方式密切相關.
struct eventpoll{..../*紅黑樹的根節點,這顆樹中存儲著所有添加到 epoll 中的需要監控的事件*/struct rb_root rbr;/*雙鏈表中則存放著將要通過 epoll_wait 返回給用戶的滿足條件的事件*/struct list_head rdlist;....
};
? 每一個 epoll 對象都有一個獨立的 eventpoll 結構體,用于存放通過 epoll_ctl 方 法向 epoll 對象中添加進來的事件.
? 這些事件都會掛載在紅黑樹中,如此,重復添加的事件就可以通過紅黑樹而高 效的識別出來(紅黑樹的插入時間效率是 lgn,其中 n 為樹的高度).
? 而所有添加到 epoll 中的事件都會與設備(網卡)驅動程序建立回調關系,也就是 說,當響應的事件發生時會調用這個回調方法.
? 這個回調方法在內核中叫 ep_poll_callback,它會將發生的事件添加到 rdlist 雙 鏈表中.
? 在 epoll 中,對于每一個事件,都會建立一個 epitem 結構體.
struct epitem{struct rb_node rbn;//紅黑樹節點struct list_head rdllink;//雙向鏈表節點struct epoll_filefd ffd; //事件句柄信息struct eventpoll *ep; //指向其所屬的 eventpoll 對象struct epoll_event event; //期待發生的事件類型
}
? 當調用 epoll_wait 檢查是否有事件發生時,只需要檢查 eventpoll 對象中的 rdlist 雙鏈表中是否有 epitem 元素即可.(有事件就緒了,提前注冊的回調機制會自動的把紅黑樹的節點添加到雙鏈表中)
? 如果 rdlist 不為空,則把發生的事件復制到用戶態,同時將事件數量返回給用 戶. 這個操作的時間復雜度是 O(1).
2.2 epoll的demo
#pragma once#include <iostream>
#include <memory>
#include <unistd.h>
#include <cstring>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "Common.hpp"using namespace SocketModule;class EpollServer
{const static int size = 64;const int defaultfd = -1;public:EpollServer(int port) : _listensock(std::make_unique<TcpSocket>()), _isruning(false), _epfd(defaultfd){// 1.創建listensocket_listensock->BuildTcpSocketMethod(port);// 2.創建epoll模型_epfd = epoll_create(256);if (_epfd < 0){LOG(LogLevel::FATAL) << "epoll_create error...";exit(EPOLL_CREATE_ERR);}LOG(LogLevel::FATAL) << "epoll_create success , epfd: " << _epfd;// 3.將listensocket設置到內核中struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = _listensock->Fd();int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock->Fd(), &ev);if (n < 0){LOG(LogLevel::FATAL) << "add listensocket failed";exit(EPOLL_CTL_ERR);}}void Start(){int timeout = -1;_isruning = true;while (true){int n = epoll_wait(_epfd, revs, size, timeout);switch (n){case 0:LOG(LogLevel::DEBUG) << "timeout...";break;case -1:LOG(LogLevel::ERROR) << "epoll error";break;default:Dispatcher(n);break;}}}void Dispatcher(int n){for (int i = 0; i < n; i++){if (revs[i].events & EPOLLIN){if (revs[i].data.fd == _listensock->Fd()){// 新鏈接到來Accepter();}else{Recver(i);}}}}void Accepter(){InetAddr client;int sockfd = _listensock->Accept(&client); // 這里一定不會阻塞,等和拷貝分離了if (sockfd >= 0){// 獲取新鏈接成功LOG(LogLevel::INFO) << "get a new link , sockfd: " << sockfd << "client is: " << client.StringAddr();struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = sockfd;int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sockfd, &ev);if (n < 0){LOG(LogLevel::WARING) << "add socket failed";}else{LOG(LogLevel::INFO) << "add socket success";}}}void Recver(int pos){// recv的時候肯定也不會阻塞char buffer[1024];ssize_t n = recv(revs[pos].data.fd, buffer, sizeof(buffer) - 1, 0); // 這樣寫是有bug的,tcp是面向字節流的if (n > 0){buffer[n] = 0;std::cout << "client say& " << buffer << std::endl;}else if (n == 0){LOG(LogLevel::DEBUG) << "client quit";// 不讓epoll關心這個fd了int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, revs[pos].data.fd, nullptr);if (n < 0){LOG(LogLevel::FATAL) << "del socket failed";exit(EPOLL_CTL_ERR);}close(revs[pos].data.fd); // 先移除,在關閉}else{LOG(LogLevel::ERROR) << "recv error";// 不讓epoll關心這個fd了int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, revs[pos].data.fd, nullptr);if (n < 0){LOG(LogLevel::FATAL) << "del socket failed";exit(EPOLL_CTL_ERR);}close(revs[pos].data.fd); // 先移除,在關閉}}~EpollServer(){_listensock->Close();if (_epfd > 0)close(_epfd);}private:std::unique_ptr<Socket> _listensock;bool _isruning;int _epfd;struct epoll_event revs[size];
};
2.3 epoll的優點
? 接口使用方便: 雖然拆分成了三個函數, 但是反而使用起來更方便高效. 不需要 每次循環都設置關注的文件描述符, 也做到了輸入輸出參數分離開
? 數據拷貝輕量: 只在合適的時候調用 EPOLL_CTL_ADD 將文件描述符結構拷貝到 內核中, 這個操作并不頻繁(而 select/poll 都是每次循環都要進行拷貝)
? 事件回調機制: 避免使用遍歷, 而是使用回調函數的方式, 將就緒的文件描述符 結構加入到就緒隊列中, epoll_wait 返回直接訪問就緒隊列就知道哪些文件描述符就 緒. 這個操作時間復雜度 O(1). 即使文件描述符數目很多, 效率也不會受到影響.
? 沒有數量限制: 文件描述符數目無上限.
三. LT 與 ET模式
理解ET
使用 ET 模式的 epoll, 需要將文件描述設置為非阻塞. 這個不是接口上的要求, 而是 "工 程實踐" 上的要求. 假設這樣的場景: 服務器接收到一個 10k 的請求, 會向客戶端返回一個應答數據. 如果客 戶端收不到應答, 不會發送第二個 10k 請求.
如果服務端寫的代碼是阻塞式的 read, 并且一次只 read 1k 數據的話(read 不能保證一 次就把所有的數據都讀出來, 參考 man 手冊的說明, 可能被信號打斷), 剩下的 9k 數據 就會待在緩沖區中.
此時由于 epoll 是 ET 模式, 并不會認為文件描述符讀就緒. epoll_wait 就不會再次返 回. 剩下的 9k 數據會一直在緩沖區中. 直到下一次客戶端再給服務器寫數據. epoll_wait 才能返回
但是問題來了.
? 服務器只讀到 1k 個數據, 要 10k 讀完才會給客戶端返回響應數據.
? 客戶端要讀到服務器的響應, 才會發送下一個請求
? 客戶端發送了下一個請求, epoll_wait 才會返回, 才能去讀緩沖區中剩余的數 據.
所以, 為了解決上述問題(阻塞 read 不一定能一下把完整的請求讀完), 于是就可以使用 非阻塞輪訓的方式來讀緩沖區, 保證一定能把完整的請求都讀出來.而如果是 LT 沒這個問題. 只要緩沖區中的數據沒讀完, 就能夠讓 epoll_wait 返回文件 描述符讀就緒.
四. reactor
Reactor.hpp
#pragma once#include <iostream>
#include <memory>
#include <unordered_map>
#include "Epoller.hpp"
#include "Connection.hpp"class Reactor
{
private:static const int revs_num = 128;bool IsConnectionExists(std::shared_ptr<Connection> &conn){return IsConnectionExistsHelper(conn->GetSockFd());}bool IsConnectionExists(int sockfd){return IsConnectionExistsHelper(sockfd);}bool IsConnectionExistsHelper(int sockfd){auto iter = _connections.find(sockfd);if (iter == _connections.end()){return false;}return true;}bool IsConnectionEmpty(){return _connections.empty();}public:Reactor() : _epoll_ptr(std::make_unique<Epoller>()), _isruning(false){}void Start(){if (IsConnectionEmpty()){return;}_isruning = true;while (true){PrintConnection();int n = _epoll_ptr->Wait(_revs, revs_num, -1);for (int i = 0; i < n; i++){int sockfd = _revs[i].data.fd;uint32_t revents = _revs[i].events;//將所有的異常處理轉換為IO錯誤if(revents & EPOLLERR)revents |= (EPOLLIN | EPOLLOUT);//只要是出錯了就打開讀寫端if(revents & EPOLLHUP)revents |= (EPOLLIN | EPOLLOUT);if(revents & EPOLLIN){//不用區分異常了,因為統一處理//不用區分是listensocket還是普通事件就緒if(IsConnectionExists(sockfd))_connections[sockfd]->Recver();}if(revents & EPOLLOUT){if(IsConnectionExists(sockfd))_connections[sockfd]->Sender();}}}_isruning = false;}void AddNewConnection(std::shared_ptr<Connection> &conn){if (IsConnectionExists(conn)){LOG(LogLevel::WARING) << "conn is exists: " << conn->GetSockFd();return;}// 1.把conn對應的fd和他關心的事件寫到內核uint32_t events = conn->GetEvent();int sockfd = conn->GetSockFd();_epoll_ptr->Add(sockfd, events);// *.設置回指指針conn->SetOwner(this);// 2.把connection對象添加到connections內部_connections[sockfd] = conn;}void EnableReadWrite(int sockfd,bool enableread,bool enablewrite){// 不要重復添加if (!IsConnectionExists(sockfd)){LOG(LogLevel::WARING) << "EnableReadWrite: conn is exists: " << sockfd;return;}// 修改當前的sockfd對應的connection關心的事件uint32_t events = (EPOLLET | (enableread ? EPOLLIN : 0) | (enablewrite ? EPOLLOUT : 0));_connections[sockfd]->SetEvent(events);//再去寫透到內核中_epoll_ptr->Mod(sockfd,events);}void DelConnection(int sockfd){_epoll_ptr->Del(sockfd);_connections.erase(sockfd);close(sockfd);}void PrintConnection(){std::cout << "當前正在管理的fd:" << std::endl;for(auto &conn : _connections){std::cout << conn.second->GetSockFd() << " ";}std::cout << "\r\n";}~Reactor(){}private:// 1.epoll模型std::unique_ptr<Epoller> _epoll_ptr;// // 2.listensocket 單獨封裝管理// std::shared_ptr<Listener> _listener_ptr;// 3.每一個fd都需要一個單獨的輸入輸出緩沖區,管理套接字std::unordered_map<int, std::shared_ptr<Connection>> _connections;// 4.就緒的所有事件struct epoll_event _revs[revs_num];bool _isruning;
};
Listener.hpp
#pragma once#include <iostream>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "Channel.hpp"
#include "Connection.hpp"using namespace SocketModule;class Listener : public Connection
{
public:Listener(int port = 8080) : _port(port), _listensock(std::make_unique<TcpSocket>()){_listensock->BuildTcpSocketMethod(_port);SetEvent(EPOLLIN | EPOLLET);SetNonBlock(_listensock->Fd());}int GetSockFd(){return _listensock->Fd();}void Recver() override{InetAddr client;//雖然是新鏈接到來了,但是只有一個鏈接嗎,//while,ET,設置fd為非阻塞while (true){int sockfd = _listensock->Accept(&client);if(sockfd == ACCEPT_ERR){break;}else if(sockfd == ACCEPT_CONTINUE){continue;}else if(sockfd == ACCEPT_DONE){break;}else{//是一個合法的fd,但是怎么去添加到_connections里?需要回調指針std::shared_ptr<Connection> conn = std::make_shared<Channel>(sockfd,client);conn->SetEvent(EPOLLIN | EPOLLET);if(_handler != nullptr)conn->RegisterHandler(_handler);GetOwner()->AddNewConnection(conn);}}}// std::string& Inbuffer() override// {}// std::string& AppendOutBuffer(std::string& out) override// {}void Sender() override{}void Excepter() override{}~Listener(){}private:int _port;std::unique_ptr<Socket> _listensock;
};
Channel.hpp
#pragma once#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <functional>
#include "Common.hpp"
#include "Connection.hpp"
#include "InetAddr.hpp"#define SIZE 1024class Channel : public Connection
{
public:Channel(int sockfd,const InetAddr& client) : _sockfd(sockfd),_client_addr(client){SetNonBlock(sockfd);}int GetSockFd(){return _sockfd;}//保證把本輪數據讀完 (while循環)//即便是讀完了,怎么知道數據由完整的報文,如果是多個報文呢?(協議)void Recver() override{char buffer[SIZE];while(true){buffer[0] = 0;ssize_t n = recv(_sockfd,buffer,sizeof(buffer) - 1, 0);if(n > 0){buffer[n] = 0;_inbuffer += buffer;}else if(n == 0){Excepter();return;}else{if(errno == EAGAIN || errno == EWOULDBLOCK){break;//本輪數據讀完了}else if(errno == EINTR){continue;}else{Excepter();return;}}}LOG(LogLevel::DEBUG) << _inbuffer;if(!_inbuffer.empty()){// _handler(std::shared_ptr<Connection>(this));_outbuffer += _handler(_inbuffer);}if(!_outbuffer.empty()){Sender();// GetOwner()->EnableReadWrite(_sockfd,true,true);}}// std::string& Inbuffer() override// {// return _inbuffer;// }// std::string& AppendOutBuffer(std::string& out) override// {// _outbuffer += out;// return _outbuffer;// }void Sender() override{while (true){ssize_t n = send(_sockfd,_outbuffer.c_str(),_outbuffer.size(),0);//非阻塞發if(n > 0){_outbuffer.erase(0,n);if(_outbuffer.empty())break;}else if(n == 0){break;}else{if(errno == EAGAIN || errno == EWOULDBLOCK){break;}else if(errno == EINTR){continue;}else{Excepter();return;}}}// 1.數據發送完畢// 2.發送條件不具備if(!_outbuffer.empty()){// 開啟對寫事件的關心GetOwner()->EnableReadWrite(_sockfd,true,true);}else{GetOwner()->EnableReadWrite(_sockfd,true,false);}}void Excepter() override{GetOwner()->DelConnection(_sockfd);}~Channel(){}private:int _sockfd;std::string _inbuffer;std::string _outbuffer;InetAddr _client_addr;// handler_t _handler;
};
Connection.hpp
#pragma once#include <iostream>
#include <string>
#include "InetAddr.hpp"class Reactor;
class Connection;using handler_t = std::function<std::string (std::string &)>;class Connection
{
public:Connection():_owner(nullptr),_events(0){}virtual void Recver() = 0;virtual void Sender() = 0;virtual void Excepter() = 0;virtual int GetSockFd() = 0;// virtual std::string& Inbuffer() = 0;// virtual std::string& AppendOutBuffer(std::string& out) = 0;void RegisterHandler(handler_t handler){_handler = handler;}void SetEvent(const uint32_t &events){_events = events;}uint32_t GetEvent(){return _events;}void SetOwner(Reactor *owner){_owner = owner;}Reactor *GetOwner(){return _owner;}~Connection(){}private:// 回指指針,用于listensocket添加普通套接字Reactor *_owner;// 關心事件uint32_t _events;
public:handler_t _handler;
};
Main.cc
#include <iostream>
#include "Log.hpp"
#include "Reactor.hpp"
#include "Listener.hpp"
#include "Protocol.hpp"
#include "NetCal.hpp"static void Usage(std::string proc)
{std::cerr << "Usage: " << proc << " port" << std::endl;
}int main(int argc, char *argv[])
{if (argc != 2){Usage(argv[0]);exit(USAGE_ERR);}Enable_Console_Log_Strtegy();int port = std::stoi(argv[1]);// 構建業務模塊std::shared_ptr<Cal> cal = std::make_shared<Cal>();// 構建協議對象std::shared_ptr<Protocol> protocal = std::make_shared<Protocol>([&cal](Request& req) -> Response{return cal->Execute(req);});// 構建listener對象std::shared_ptr<Connection> conn = std::make_shared<Listener>(port);conn->RegisterHandler([&protocal](std::string& inbuffer)->std::string{std::string response_str;//可能不止一個報文while(true){std::string package;if(!protocal->Decode(inbuffer,&package))break;response_str += protocal->Execute(package);}return response_str;});std::unique_ptr<Reactor> tsvr = std::make_unique<Reactor>();tsvr->AddNewConnection(conn);tsvr->Start();return 0;
}