🔥個人主頁🔥:孤寂大仙V
🌈收錄專欄🌈:計算機網絡
🌹往期回顧🌹:【計算機網絡】非阻塞IO——epoll 編程與ET模式詳解——(easy)高并發網絡服務器設計
🔖流水不爭,爭的是滔滔不息
- 一、Reactor模式簡介
- Reactor 模式整體過程
- Reactor 模式優點
- 二、Reactor 模式網絡框架實現詳解
- Reactor整體結構
- Reactor(反應堆)
- Epoller(epoll封裝)
- Listener(監聽器)
- Channel(每條 TCP 連接)
- 數據流轉時序解讀(舉個例子)
- 代碼實現
- Reactor.hpp
- Epoller.hpp
- Connection.hpp
- Listener.hpp 和 Channel.hpp
- Main.cc
一、Reactor模式簡介
Reactor 模式是網絡編程中最為典型且最實用的一種事件處理模式,廣泛應用于高速網絡服務器中(比如 Nginx、Redis、Netty 等),旨在實現事件驅動下的非阻塞 I/O,從而用少量線程處理海量連接,大幅提升并發能力。
Reactor 模式整體過程
Reactor 模式主要分為以下幾個環節:
-
事件源(Event Source)
由操作系統提供,如socket、文件、時鐘等。這些事件源發生時,會準備好數據或者發生條件。 -
事件分離器(Demultiplexer, Epoll/Select 等)
負責監控一組事件源,若其中有準備好進行I/O的數據時,會將它們分離出去,告知Reactor進行處理。
本實現中我們借助 epoll 進行事件檢測,epoll為我們提供高速且靈活的I/O事件通知。 -
事件分發器(Reactor)
Reactor根據事件發生時哪個文件描述符準備好,分發給事先為每個文件準備好的事件處理器(Handlers)進行處理。
本實現中,Reactor為每一個連接綁定了對應的handler(比如說 Connection 類),由handler進行數據讀寫。 -
事件處理器(Handlers)
事件處理器負責對發生的事件進行具體的數據操作。比如說:
讀緩沖區的數據。
進行業務邏輯處理。
再寫回到緩沖區中。
本實現中(下面代碼),Handlers 就是 Connection 的子類,如 Listener(負責接受新連接)、Channel(負責讀寫數據)等。
Reactor 模式的數據流
整體的數據流可以簡述為:
事件源 -> epoll 等事件分離器 -> Reactor -> 事件處理器 -> 讀/寫緩沖區 -> 業務處理 -> 發送出去
Reactor 模式優點
- 資源消耗少: 單個Reactor中只需要少量線程即可處理數千甚至數萬個并發連接。
- 擴展性強: 適用從中、小到大型網絡服務。
- 解耦: 事件分離器、事件分發器、事件處理器之間高度解耦,方便代碼擴展和測試。
- 非阻塞: 依賴非阻塞I/O,避免資源浪費和時間消耗。
有興趣可以了解這篇文章——Reactor模式
英文原版——reactor模式
二、Reactor 模式網絡框架實現詳解
近幾年來,隨著并發量增加,Reactor 模式憑借非阻塞 I/O + 事件通知成為實現高速網絡服務時最優且最穩定的方案。這種模式主體上依賴 Linux 的 epoll 等內核機制進行事件分發,做到用少量線程處理很多連接。 讓我帶你看看你實現出的這個網絡框架是怎樣工作的。
Reactor整體結構
Reactor(反應堆)
負責事件循環(poll)、分發和管理每條連接。
class Reactor {// 負責:// 1. epoll_create// 2. epoll_wait 等待事件發生// 3. 依序分發給每條 Connection 進行處理// 4. 新連接時進行添加// 5. 斷線時進行清理
}
Epoller(epoll封裝)
對 epoll_create、epoll_ctl、epoll_wait 進行封裝,為 Reactor 減少實現負擔
class Epoller {// 負責:// 1. epoll_create// 2. epoll_ctl ADD/MOD/DEL// 3. epoll_wait 等待事件發生
}
Listener(監聽器)
負責對listen socket進行準備(綁定、listen、非阻塞設置),有新連接時為每條連接封裝為 Connection,再交給 Reactor。
class Listener : public Connection {// 負責:// 1. 綁定、listen// 2. Accept 新連接// 3. 設置為非阻塞// 4. 交由 Reactor 管理
}
Channel(每條 TCP 連接)
封裝每條 TCP 連接,負責數據緩沖、讀寫和異常處理。
class Channel : public Connection {// 負責:// 1. recv 讀數據到緩沖區// 2. send 發送緩沖區的數據出去// 3. EPOLLIN 就讀// 4. EPOLLOUT 就寫
}
數據流轉時序解讀(舉個例子)
比如說:
有一個Client連到Server。
Reactor 通過 epoll_wait 等到此時有一個 EPOLLIN 事件。
這時 Reactor 就會:檢查是不是 Listening socket。
-> 若是,則由 Listener 進行 Accept()。
-> 若不是,則由對應 Connection (比如 Channel) 進行 recv()。
Connection 讀到數據后,會進入自己的緩沖區。
再由你為每條 Connection 設置好handler進行業務處理。
處理完后若需要發送出去,則設置為 EPOLLOUT,由 Reactor 繼續關注此 socket 的寫事件。
Reactor 就是事件分發器,它自己不干活(比如說不存在自己去讀數據、寫數據),而是根據事件去通知對應的數據源(比如說 socket)由它自己進行處理。
讓我舉個例子,你可以把它想象為高速公路收費站:
- 收費站(Reactor):負責分流每一條道路上發生的小汽車。
- 每一條道路(socket):可以發生不同的事件(有車到達,需要付費)。
- 收費員(handler):為每一條道路提供服務(比如收錢、找零)。
Reactor 典型工作過程:
- epoll_wait 等待事件:
- Reactor 通過 epoll_wait 等待哪個 socket 有事件發生。
- 可能發生的事件包括:
有新連接到達(listen socket)
有現有連接有數據可以讀(client socket)
有緩沖區可以寫出去(client socket)
有異常發生(比如對方斷掉)
- 分發事件:
-
Reactor 檢查哪個文件發生了哪個事件。
-
若是listen socket 有 EPOLLIN 事件:
-> 就由 Listener 執行 Accept() ,獲取到 新的 Connection。
-> 新 Connection 就被加入到 Reactor 的關注表中。 -
若是現有 Connection 有 EPOLLIN 事件:
-> 就由對應 Connection 執行 recv() ,讀出數據到緩沖區。
-> 再由 Connection 綁定的數據處理器(handler)進行業務邏輯處理。 -
若是需要發送出去的數據準備好:
-> 就可以為 Connection 設置 EPOLLOUT ,由 Reactor 關注到時進行 send() 發送出去。
代碼實現
Reactor.hpp
#pragma once
#include <iostream>
#include <memory>
#include <unordered_map>
#include "Epoller.hpp"
#include "Connection.hpp"
#include "Log.hpp"using namespace std;
using namespace LogModule;class Reactor // 基座 Reactor反應堆模式高并發網絡服務器
{static const int size = 128;
private:bool IsAddConnectionHelper(int sockfd) // 判斷_connections中是否存在connection{auto iter = _connections.find(sockfd);if (iter == _connections.end())return false;elsereturn true;}bool IsAddConnectionExist(const shared_ptr<Connection> &conn){return IsAddConnectionHelper(conn->GetSockFd());}bool IsAddConnectionExist(int sockfd){return IsAddConnectionHelper(sockfd);}bool IsConnectionEmpty() //判斷_connections是否為空{return _connections.empty();}int LoopOnce(int timeout){int n = _epoller_ptr->WaitEvents(_revs, size, timeout); // 獲取就緒隊列中的就緒事件return n;} void Dispatcher(int n){for (int i = 0; i < n; i++){int sockfd = _revs[i].data.fd; // 拿到就緒隊列中就緒的文件描述符uint32_t events = _revs[i].events; // 拿到就緒隊列中的事件if (events & EPOLLERR) // 就緒事件這里不管出什么錯都進EPOLLIN|EPOLLOUT讓具體的異常處理進行處理events |= (EPOLLIN | EPOLLOUT);if (events & EPOLLHUP)events |= (EPOLLIN | EPOLLOUT);if (events & EPOLLIN){// 讀事件就緒if (IsAddConnectionExist(sockfd))_connections[sockfd]->Recver();//索引_connection中connection對象}if (events & EPOLLOUT){// 寫事件就緒if (IsAddConnectionExist(sockfd))_connections[sockfd]->Sender();}}}public:Reactor(): _epoller_ptr(make_unique<Epoller>()),_isrunning(false){memset(_revs, 0, sizeof(_revs)); // 初始化 _revs}void Loop(){if (IsConnectionEmpty())return;_isrunning = true;int timeout=-1;while (_isrunning){PrintConnection();int n=LoopOnce(timeout); //獲取就緒隊列就緒事件個數Dispatcher(n); //派發,讀事件||寫事件}_isrunning = false;}// 把所有新連接添加到_connections并把事件寫到內核void AddConnection(shared_ptr<Connection> &conn){if (IsAddConnectionExist(conn)){LOG(LogLevel::WARNING) << "connection is exist" << conn->GetSockFd();return;}uint32_t events = conn->GetEvent(); // 一個connection中的事件int sockfd = conn->GetSockFd(); // 一個connection中的文件描述符_epoller_ptr->AddEvent(sockfd, events); // 寫入內核conn->SetOwner(this); //設置回指指針把reactor對象設置到connection基類_connections[sockfd] = conn; //把connection放到_connections中}//關心寫事件,關寫事件的connection放進_connections中就又走那個派發邏輯了,然后再去發數據(妙!)void EnableReadWrite(int sockfd,bool enableread,bool enablewrit){if(!IsAddConnectionExist(sockfd)){LOG(LogLevel::WARNING) << "connection is not exist" << sockfd;return;}uint32_t new_event=(EPOLLET | (enableread ? EPOLLIN : 0) | (enablewrit ? EPOLLOUT : 0));_connections[sockfd]->SetEvent(new_event);_epoller_ptr->ModEvent(sockfd,new_event);}//異常處理(關閉文件描述符)void DelConnection(int sockfd) {_epoller_ptr->DelEvent(sockfd); //移除關心事件_connections.erase(sockfd); //_connections移除自己close(sockfd); //關閉文件描述符LOG(LogLevel::DEBUG)<<"client quit";}void PrintConnection(){std::cout << "當前Reactor正在進行管理的fd List:";for(auto &conn : _connections){std::cout << conn.second->GetSockFd() << " ";}std::cout << "\r\n";}~Reactor(){}private:unique_ptr<Epoller> _epoller_ptr; // 構造epoll模型unordered_map<int, shared_ptr<Connection>> _connections; // 管理每一個connectionbool _isrunning; // 判斷是否啟動struct epoll_event _revs[size]; // 就緒隊列就緒事件
};
_epoll_ptr是對epoll類型的封裝,Reactor是負責統籌兼顧的事件分發器。獲取就緒隊列中的就緒事件,然后根據事件的不同去進行派發。
整個reactor模式有connection管理一套fd及其相對應的事件。為了在reactor中進行管理,讓文件描述符和connection建立哈希映射是_connections。listen套接字或者普通套接字的關心事件都會寫入connection所以在添加到_connection之前要拿到這個文件描述符關心的事件并寫入內核,然后放入hash表中。后面進行事件派發的時候是根據就緒隊列中的文件描述符對_connections哈希表中的對應的寫事件就緒或者讀事件就緒的connection(listen或者channel)進行索引。然后根據不同的事件去走不同的邏輯。
若 sockfd 對應 Listening ,則 Recv() 就是去 Accept() 新連接。
若 sockfd 對應 Client ,則 Recv() 就是去讀數據。
若需要 Sender() ,則是對 Client 進行寫緩沖的數據發送出去。
異常處理這里就是移除關心的事件然后在_connections中移除索引然后關閉文件描述符。
Epoller.hpp
#pragma once
#include <iostream>
#include <sys/epoll.h>
#include "Log.hpp"
#include "Common.hpp"using namespace std;
using namespace LogModule;class Epoller
{
public:Epoller():_epfd(-1){_epfd=epoll_create(256); //構造epoll模型if(_epfd<0) {LOG(LogLevel::FATAL)<<"epoll_create error";exit(EPOLL_CREATE_ERR);}LOG(LogLevel::INFO)<<"epoll_create success";}void AddEvent(int sockfd,uint32_t events) //添加關心的事件{struct epoll_event ev;ev.events=events;ev.data.fd=sockfd;int n=epoll_ctl(_epfd,EPOLL_CTL_ADD,sockfd,&ev);if(n<0){LOG(LogLevel::FATAL)<<"epoll_ctl error";exit(EPOLL_CTL_ERR);}LOG(LogLevel::INFO)<<"epoll_ctl success";}int WaitEvents(struct epoll_event revs[],int size,int timeout) //就緒隊列中就緒事件個數{int n=epoll_wait(_epfd,revs,size,timeout);if(n<0){LOG(LogLevel::WARNING)<<"epoll_wait error";return n;}else if(n==0){LOG(LogLevel::WARNING)<<"epoll_wait timeout";return n;}else{LOG(LogLevel::INFO)<<"讀事件就緒";return n;}}void ModEvent(int sockfd,uint32_t events){struct epoll_event ev;ev.data.fd=sockfd;ev.events=events;int n=epoll_ctl(_epfd,EPOLL_CTL_MOD,sockfd,&ev);if(n<0){LOG(LogLevel::FATAL)<<"mov_event error";exit(EPOLL_CTL_ERR);}LOG(LogLevel::INFO)<<"mov_event success";}void DelEvent(int sockfd){int n=epoll_ctl(_epfd,EPOLL_CTL_DEL,sockfd,nullptr);if(n<0){LOG(LogLevel::FATAL)<<"del_event error";exit(EPOLL_CTL_ERR);}LOG(LogLevel::INFO)<<"del_event success";}~Epoller(){}
private:int _epfd;
};
根據epoll的函數進行封裝。
Connection.hpp
#pragma once
#include <iostream>
#include <string>
#include "InetAddr.hpp"using namespace std;
using namespace LogModule;using handler_t=function<string(string&)>;class Reactor;
class Connection;//封裝fd,保證每個fd一套緩沖區
//基類
class Connection
{
public:Connection(){}virtual void Recver()=0;virtual void Sender()=0;virtual void EXcepter()=0; //異常處理virtual int GetSockFd() = 0;void SetEvent(const uint32_t &events)//獲取事件{_events=events;}uint32_t GetEvent()//返回事件{return _events; }void SetOwner(Reactor* owner)//設置回指指針{_owner=owner;}Reactor* GetOwner()//返回回指指針{return _owner;}void RegisterHandler(handler_t handler){_handler=handler;}~Connection(){}
private: Reactor *_owner; //回指指針uint32_t _events; //關系的事件
public:handler_t _handler;
};
作為基類,純虛方法給Listener和Channel。事件管理和回指指針,為什么用回指指針后面Listener中聊。注冊回調也在后面用到的時候聊。
Listener.hpp 和 Channel.hpp
Listener.hpp
#pragma once
#include <iostream>
#include <memory.h>
#include "Connection.hpp"
#include "Common.hpp"
#include "Socket.hpp"
#include "Log.hpp"
#include "Channel.hpp"using namespace std;
using namespace SocketModule;
using namespace LogModule;//獲取新連接
class Listener : public Connection
{
public:Listener(int port=defaultport):_port(port),_listensock(make_unique<TcpSocket>()){_listensock->BuildTcpSocketMethod(_port); //注意這里已經獲取了listen套接字SetEvent(EPOLLIN | EPOLLET); //關心事件設置到connection && ET模式 SetNonBlock(_listensock->Fd()); //文件描述符設置為非阻塞(ET模式)}void Recver() override //收{InetAddr client;while(true)//ET模式循環讀文件描述符{ //這個文件描述符是正常文件描述符int sockfd=_listensock->Accept(&client);if(sockfd==ACCEPTDONE)break;else if(sockfd==ACCEPTCONTINUE)continue;else if(sockfd==ACCEPTERROR)break;else{//成功獲取連接shared_ptr<Connection> conn=make_shared<Channel>(sockfd,client);//構造channel對象conn->SetEvent(EPOLLIN | EPOLLET);if(_handler!=nullptr)conn->RegisterHandler(_handler);GetOwner()->AddConnection(conn);//繼承基類函數 拿到reactor指針進而把channel對象添加到connection}}}void Sender() override{}void EXcepter() override{}int GetSockFd() override{return _listensock->Fd();}~Listener(){}
private:unique_ptr<Socket> _listensock;int _port;
};
Channel.hpp
#pragma once
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <memory>
#include <functional>
#include "Common.hpp"
#include "Connection.hpp"
#include "Log.hpp"
#include "InetAddr.hpp"using namespace std;
using namespace LogModule;#define SIZE 1024
// 普通文件描述符的IO
class Channel : public Connection
{
public:Channel(int sockfd, InetAddr &client): _sockfd(sockfd), _client_addr(client){SetNonBlock(sockfd);}void Recver() override // 收客戶端數據{char buffer[SIZE];while (true){buffer[0] = 0;ssize_t n = recv(_sockfd, buffer, sizeof(buffer) - 1, 0);if (n > 0){buffer[n] = 0;_inbuffer += buffer;}else if (n == 0) // 讀到數據為0{EXcepter(); // 進入異常處理return;}else // 讀取失敗{if (errno == EAGAIN || errno == EWOULDBLOCK){break;}else if (errno == EINTR){continue;}else{EXcepter();return;}}}LOG(LogLevel::DEBUG) << "Channel inbuffer->" << _inbuffer;if (!_inbuffer.empty()){_outbuffer = _handler(_inbuffer); // 處理完的數據給outbuffer}if (!_outbuffer.empty()) // 發數據{Sender();}}void Sender() override{while (true){ssize_t n = send(_sockfd, _outbuffer.c_str(), _outbuffer.size(), 0);if (n > 0){_outbuffer.erase(0, n); //發送緩沖區收到多少數據就清空多少個if (_outbuffer.empty())break;}else if (n == 0){break;}else{if (errno == EAGAIN || errno == EWOULDBLOCK){break;}else if (errno == EINTR){continue;}else{EXcepter();return;}}}//注意什么時候可以設置關系寫事件,寫事件默認就是就緒的只有當發送緩沖區數據滿了的時候才設置關系寫事件//發送緩沖區已經滿了,outbuffer的數據發不到發送緩沖區中了,所以判斷outbuffer不為空 //這時候就要關系寫事件,什么時候發送緩沖區有空位了outbuffer再往發送緩沖區中寫if (!_outbuffer.empty()){GetOwner()->EnableReadWrite(_sockfd,true,true);}else{GetOwner()->EnableReadWrite(_sockfd,true,false);}}void EXcepter() override //異常處理{GetOwner()->DelConnection(_sockfd);}int GetSockFd() override // 獲取文件描述符{return _sockfd;}~Channel(){}private:int _sockfd; // 套接字string _inbuffer;string _outbuffer;InetAddr _client_addr;
};
Listener繼承connection,構造的時候獲取連接,把設置關系事件到connection中,然后因為是ET模式邊緣觸發要把文件描述符設置為非阻塞的。
繼承connection要實現connection的虛函數,Recver()是接收連接創建普通套接字。拿到普通套接字要進行數據的收發了,這時候Channel就要出場了。設置關心事件到connection,這個Channel對象也要放到_connections中吧,不放到_connections中的話在Reactor中對文件描述符進行索引如果是派發讀數據寫數據那還咋讀咋寫,所以基類connection中的回指指針就派上用場了,通過Reactor的指針用類內的AddConnection
方法把Channel對象也放進去。
Channel主要是進行數據的收發,收數據放到_inbuffer
中讀到數據為零進入異常處理讀取錯誤進行錯誤處理。注冊回調把_inbuffer
通過回調給另一個模塊進行處理然后返回后放到_outbuffer
(發數據)。這里的回調function這些放到基類connection中,這樣Listener和Channel都能繼承到。Listener的if(_handler!=nullptr) conn->RegisterHandler(_handler);
讓注冊回調更加靈活。如果_outbuffer中不為空那么就發數據。發數據這里要注意,什么時候關心寫事件呢寫事件默認就是就緒的只有當發送緩沖區數據滿了的時候才設置關系寫事件發送緩沖區已經滿了,outbuffer的數據發不到發送緩沖區中了,所以判斷outbuffer不為空 這時候就要關系寫事件,什么時候發送緩沖區有空位了outbuffer再往發送緩沖區中寫。所以判斷_outbuffer不為空說明發送緩沖區已經滿了要關心寫事件。
這時候要讓Reactor進行統一管理了,說白了就是epoll中的epoll_ctl關系事件的方式改一下,去關心寫事件,具體不多說了看代碼吧。
Main.cc
#include <iostream>
#include <string>
#include "Reactor.hpp"
#include "Listener.hpp"
#include "Common.hpp"
#include "Log.hpp"
#include "NetCal.hpp"
#include "Protocol.hpp"using namespace std;
using namespace LogModule;static void Usage(std::string proc)
{std::cerr << "Usage: " << proc << " port" << std::endl;
}int main(int argc, char *argv[])
{if (argc != 2){Usage(argv[0]);exit(USAGE_ERR);}uint16_t port = stoi(argv[1]);Enable_Console_Log_Strategy(); // 啟用控制臺輸出// 頂層業務模塊shared_ptr<Cal> cal = make_shared<Cal>();// 構造協議對象shared_ptr<Protocol> protocol = make_shared<Protocol>([&cal](Request &req) -> Response{ return cal->Execute(req); });// connection管理listen連接模塊 間接創建channel(進行io)模塊shared_ptr<Connection> conn = make_shared<Listener>(port);conn->RegisterHandler([&protocol](string &inbuffer) -> string{string response_str;while (true){string package;if (!protocol->Decode(inbuffer, &package)) // 判斷報文完不完整break;response_str += protocol->Execute(package);}LOG(LogLevel::DEBUG) << "結束匿名函數中...: " << response_str;return response_str;});// rector基座 事件派發unique_ptr<Reactor> tsvr = make_unique<Reactor>();tsvr->AddConnection(conn);tsvr->Loop();return 0;
}
應用層模塊就不多說了,已經做到完美解耦合了。頂層應用層業務模塊,然后協議對象去調用業務模塊,connection管理listen連接模塊,在listener中又創建了channel對象。這里進行回調(這個回調是用RegisterHandler在這里就體現出來了,是拿Listener對象調用的然后進行回調)把channel中的_inbuffer的數據回調給協議模塊去進行處理。
Reactor模式高并發網絡服務器——源碼