????????Reactor 是一種事件驅動的設計模式(Event-Driven Pattern),主要用于處理高并發 I/O,特別適合網絡服務器場景。它通過一個多路復用機制監聽多個事件源(如 socket 文件描述符),并在事件就緒時將事件分發給對應的處理器(回調函數)執行。
六、單reactor單線程模型
1、核心思想
????????利用一個統一的事件分發中心(
EventLoop
),在單個線程中通過 I/O 多路復用機制(如 epoll)高效監聽和響應多個并發連接的 I/O 事件,將事件檢測與事件處理解耦,從而實現結構簡單、性能良好的并發 I/O 服務器。
2、核心組件
類名 | 主要職責 |
---|---|
Channel | 封裝一個 fd 的事件及回調,是 fd 與 epoll 的橋梁 |
EventLoop | 管理 epoll 及事件分發,是每個線程中的核心循環 |
TcpServer | 管理監聽 socket 和連接接入邏輯,創建 TcpConn |
TcpConn | 封裝已建立的連接的讀寫處理、緩沖、生命周期等 |
1)Channel類-事件通道
Channel 是 一個 fd 與其事件處理邏輯之間的中介,負責:
事件注冊:設置監聽哪些事件(如
EPOLLIN
,EPOLLOUT
)事件觸發:內核通知時,調用用戶設置的回調函數
與 EventLoop 協作:將事件注冊/修改到
epoll
中
.h
/* 負責在事件分發系統中起到“事件通道”的作用,連接底層的I/O多路復用機制(如epoll、select、poll)和具體的事件處理邏輯 */ class EventLoop;class Channel { public:Channel(EventLoop& loop, int fd);~Channel();// 設置 fd 對應的感興趣事件(EPOLLIN/EPOLLOUT)void EnableReading();void EnableWriting();void DisableWriting();void DisableAll();// 用戶提供的讀寫事件回調void SetReadCallback(std::function<void()> cb);void SetWriteCallback(std::function<void()> cb);// 實際處理事件觸發,調用該函數判斷是否調用 read_cb_ 或 write_cb_void HandleEvent(uint32_t events);// 獲取 fdint GetFd() const;// 獲取事件uint32_t GetEvents() const;// 將當前 Channel 注冊到 epoll 或修改其狀態。void Update();private:int fd_; // 監聽的文件描述符bool added_ = false; // 是否已添加到 epollEventLoop& loop_; // 所屬的事件循環uint32_t events_; // 當前監聽的事件類型(EPOLLIN/EPOLLOUT)std::function<void()> read_cb_; // 讀事件回調std::function<void()> write_cb_; // 寫事件回調 };
函數名 | 功能 | 調用時機 |
---|---|---|
EnableReading() | 關注讀事件并更新 epoll 狀態 | 監聽 socket / 連接 socket 可讀時 |
EnableWriting() | 關注寫事件(注冊 EPOLLOUT) | Send() 數據寫不完時注冊 |
DisableWriting() | 注銷寫事件(避免忙等) | 寫完所有 buffer 后 |
HandleEvent(events) | 根據 epoll 返回事件觸發回調 | epoll_wait 返回后由 EventLoop 調用 |
Update() | 將本 Channel 注冊或修改到 epoll | 每次 Enable/Disable 事件后必須調用 |
.cpp
Channel::Channel(EventLoop& loop, int fd): fd_(fd), loop_(loop), events_(0) {}Channel::~Channel() {DisableAll(); }void Channel::EnableReading() {events_ |= EPOLLIN;Update(); }void Channel::EnableWriting() {events_ |= EPOLLOUT;Update(); }void Channel::DisableWriting() {events_ &= ~EPOLLOUT;Update(); }void Channel::DisableAll() {loop_.DelEvent(fd_);events_ = 0; }void Channel::SetReadCallback(std::function<void()> cb) {read_cb_ = std::move(cb); }void Channel::SetWriteCallback(std::function<void()> cb) {write_cb_ = std::move(cb); }void Channel::HandleEvent(uint32_t events) {if ((events & EPOLLIN) && read_cb_) read_cb_();if ((events & EPOLLOUT) && write_cb_) write_cb_(); }int Channel::GetFd() const {return fd_; }uint32_t Channel::GetEvents() const {return events_; }void Channel::Update() {if (!added_) {loop_.AddEvent(fd_, events_, this);added_ = true;} else {loop_.ModEvent(fd_, events_, this);} }
2)EventLoop類-事件循環
EventLoop 是 Reactor 的核心調度器,負責:
管理所有 Channel 的事件監聽
調用 epoll_wait 等待就緒事件
分發事件并調用 Channel 的處理函數
.h
#define MAX_EVENTS 1024/* 負責事件的等待、分發和調度執行*/ class EventLoop { public:EventLoop() ;~EventLoop();void AddEvent(int fd, uint32_t events, void *ptr);void ModEvent(int fd, uint32_t events, void *ptr);void DelEvent(int fd);void Run();void stop(); private:int epfd_;bool running_; };
函數名 | 功能 | 說明 |
---|---|---|
Run() | 啟動事件循環 | 持續調用 epoll_wait 并觸發回調 |
stop() | 停止事件循環 | 設置 running_ 為 false |
AddEvent(fd, events, ptr) | 添加一個 fd 到 epoll | fd 封裝在 Channel 內,由 TcpConn 創建 |
ModEvent(fd, events, ptr) | 修改 fd 的監聽事件 | 典型于 Send() 或關閉寫事件 |
DelEvent(fd) | 從 epoll 中刪除 fd | 連接關閉、Channel 銷毀時 |
.cpp
EventLoop::EventLoop() : epfd_(::epoll_create1(0)), running_(true) {if (epfd_ == -1){std::cerr << "epoll_create error: " << errno << std::endl;exit(EXIT_FAILURE);} }EventLoop::~EventLoop() {close(epfd_); }void EventLoop::AddEvent(int fd, uint32_t events, void *ptr) {epoll_event ev;ev.events = events;ev.data.ptr = ptr;if (::epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev) == -1){std::cerr << "epoll_ctl add error: " << errno << std::endl;} }void EventLoop::ModEvent(int fd, uint32_t events, void *ptr) {epoll_event ev;ev.events = events;ev.data.ptr = ptr;if (::epoll_ctl(epfd_, EPOLL_CTL_MOD, fd, &ev) == -1){std::cerr << "epoll_ctl mod error: " << errno << std::endl;} }void EventLoop::DelEvent(int fd) {if (::epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr) == -1){std::cerr << "epoll_ctl del error: " << errno << std::endl;} }void EventLoop::Run() {epoll_event events[MAX_EVENTS];while (running_){// 超時參數傳入TimerInstance()->WaitTime(),功能是確保 epoll_wait 最遲要在定時任務到期時返回,否則任務會延遲處理。int nfds = ::epoll_wait(epfd_, events, MAX_EVENTS, TimerInstance()->WaitTime());if (nfds == -1){if (errno == EINTR) // EINTR(系統中斷)時忽略重試,其他錯誤打印后直接返回。continue;std::cerr << "epoll_wait error: " << errno << std::endl;return;}// 遍歷就緒事件數組for (int i = 0; i < nfds; ++i){// 獲取就緒事件存儲在 data.ptr 的 Channel*Channel* ch = static_cast<Channel*>(events[i].data.ptr);// 觸發讀/寫等回調ch->HandleEvent(events[i].events);}// 處理定時器任務TimerInstance()->HandleTimeout();} }void EventLoop::stop() {running_ = false; }
3)TcpServer-連接管理
TcpServer 是服務端框架的頂層組織者,負責:
創建監聽 socket:創建 socket、bind、listen
創建Accept?Channel:負責接收新連接事件并注冊到 EventLoop
創建新連接回調:接收到新連接后,通過回調交由用戶處理
.h
class TcpConn; class EventLoop;class TcpServer { public:// 新連接回調,供用戶處理新連接事件。using NewConnCallback = std::function<void(std::shared_ptr<TcpConn>)>;TcpServer(EventLoop& loop);~TcpServer();// 啟動 TCP 服務監聽,注冊新連接的回調void Start(uint16_t port, NewConnCallback cb);private:// 新連接建立時的處理邏輯,作為回調函數使用void HandleAccept();private:EventLoop& loop_;int listen_fd_;std::shared_ptr<Channel> accept_channel_; // 監聽 fd 的事件通道,負責讀事件注冊(新連接到來)NewConnCallback new_conn_cb_; // 外部注入的新連接回調函數 };
函數名 | 功能 | 說明 |
---|---|---|
Start(port, cb) | 初始化 socket,設置 Channel 和回調 | 注冊 EPOLLIN 用于接收連接 |
HandleAccept() | 有新連接到來時被觸發 | accept() 然后調用回調構建 TcpConn |
new_conn_cb_ | 用戶設置的處理邏輯 | 通常設置為 lambda 創建 TcpConn 實例 |
.cpp
TcpServer::TcpServer(EventLoop& loop): loop_(loop), listen_fd_(-1) {}TcpServer::~TcpServer() {if (listen_fd_ != -1) {accept_channel_->DisableAll();close(listen_fd_);} }void TcpServer::Start(uint16_t port, NewConnCallback cb) {new_conn_cb_ = std::move(cb); // 保存回調函數// 創建 IPv4 TCP 套接字,非阻塞模式listen_fd_ = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);if (listen_fd_ == -1) {std::cerr << "socket error: " << errno << std::endl;return;}// 配置 SO_REUSEADDR(端口立即重用,避免“Address already in use”) 和 SO_REUSEPORT(多進程監聽同一端口,可用于多核負載均衡)int opt = 1;setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));// 綁定 socket 到指定端口地址sockaddr_in addr{};addr.sin_family = AF_INET;addr.sin_addr.s_addr = INADDR_ANY;addr.sin_port = htons(port);if (bind(listen_fd_, (sockaddr*)&addr, sizeof(addr)) == -1) {std::cerr << "bind error: " << errno << std::endl;close(listen_fd_);return;}// 開始監聽,監聽隊列長度為 SOMAXCONN(4096)if (listen(listen_fd_, SOMAXCONN) == -1) {std::cerr << "listen error: " << errno << std::endl;close(listen_fd_);return;}// 創建 Channel 對象,監聽 listen_fd_ 上的事件,會注冊到 epoll 中accept_channel_ = std::make_shared<Channel>(loop_, listen_fd_);// 設置讀事件回調函數:即新連接到來時應調用的邏輯函數accept_channel_->SetReadCallback([this]() { HandleAccept(); });// 啟動監聽accept_channel_->EnableReading();std::cout << "Server listening on port " << port << std::endl; }void TcpServer::HandleAccept() {sockaddr_in client_addr{};socklen_t len = sizeof(client_addr);// 接受新連接,返回新的客戶端 fd,設置非阻塞int conn_fd = accept4(listen_fd_, (sockaddr*)&client_addr, &len, SOCK_NONBLOCK);if (conn_fd == -1) return;// 創建新的連接對象(TcpConn)管理該客戶端 fdauto conn = std::make_shared<TcpConn>(conn_fd, loop_);// 調用用戶邏輯處理這個連接if (new_conn_cb_) new_conn_cb_(conn); }
4)TcpConn-與客戶端通信
TcpConn 封裝了每個 TCP 連接的生命周期、讀寫事件、緩沖區管理等。
.h
/**聲明 TcpConn 類,同時繼承 enable_shared_from_this,方便在回調中安全獲取 shared_ptr<TcpConn> 自己的智能指針。*/ class TcpConn : public std::enable_shared_from_this<TcpConn> { public:// 聲明回調函數using ReadCallback = std::function<void()>;using CloseCallback = std::function<void()>;TcpConn(int fd, EventLoop& loop);~TcpConn();// 設置回調函數void SetReadCallback(ReadCallback cb);void SetCloseCallback(CloseCallback cb);// 異步發送數據int Send(const char* data, size_t size);// 獲取當前接收緩沖區內的全部數據std::string GetAllData();private:// 內部事件處理函數:讀取、寫入、關閉連接void HandleRead();void HandleWrite();void Close();private:int fd_;bool closed_;EventLoop& loop_;MessageBuffer input_buffer_; // 讀緩沖區std::string output_buffer_{}; // 寫緩沖區std::shared_ptr<Channel> channel_; // 封裝 fd 的 epoll 管理類ReadCallback read_cb_; // 外部注入的回調函數CloseCallback close_cb_; };
函數名 | 功能 | 調用說明 |
---|---|---|
HandleRead() | 可讀事件觸發時從 fd 讀數據到 input_buffer_ | 然后觸發 read_cb_ |
HandleWrite() | 可寫事件觸發時將 output_buffer_ 中數據寫出 | 寫完后注銷 EPOLLOUT |
Send(data) | 異步發送數據 | 若 fd 可寫則立即寫,否則加入 buffer |
GetAllData() | 獲取 input_buffer_ 所有數據 | 可在 onMessage 中使用 |
Close() | 主動關閉連接 | 注銷事件、關閉 fd、觸發 close_cb_ |
.cpp
TcpConn::TcpConn(int fd, EventLoop& loop): fd_(fd), closed_(false), loop_(loop) {// // 設置 socket 為非阻塞模式// int flags = fcntl(fd, F_GETFL, 0);// fcntl(fd, F_SETFL, flags | O_NONBLOCK);// 創建 Channel 并設置讀寫回調,注冊 EPOLLIN 監聽可讀事件。channel_ = std::make_shared<Channel>(loop_, fd);channel_->SetReadCallback([this]() { HandleRead(); });channel_->SetWriteCallback([this]() { HandleWrite(); });channel_->EnableReading(); }TcpConn::~TcpConn() {Close(); }void TcpConn::SetReadCallback(ReadCallback cb) {read_cb_ = std::move(cb); }void TcpConn::SetCloseCallback(CloseCallback cb) {close_cb_ = std::move(cb); }int TcpConn::Send(const char* data, size_t size) {// 若連接已關閉或無數據,直接返回。if (closed_ || data == nullptr || size == 0) return -1;// 如果寫緩沖區中已有未發送的數據,追加數據并監聽寫事件。if (!output_buffer_.empty()) {output_buffer_.append(data, size);channel_->EnableWriting();return size;}// 直接發送(零拷貝),MSG_NOSIGNAL:防止對方關閉連接時觸發 SIGPIPE 信號(使得進程崩潰)。int n = send(fd_, data, size, MSG_NOSIGNAL);if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { // 非阻塞 socket 下,內核發送緩沖區可能暫時滿// 將數據緩存到用戶態 output_buffer_, 監聽寫事件output_buffer_.append(data, size);channel_->EnableWriting();} else if (n > 0 && n < static_cast<int>(size)) { // 有部分數據被寫入 socket(如發送了 n 字節,小于總長度 size)// 剩下的數據未能寫完,需要緩存到 output_buffer_,并監聽寫事件output_buffer_.append(data + n, size - n);channel_->EnableWriting();} else if (n < 0) {Close();}return n; }std::string TcpConn::GetAllData() {auto data = input_buffer_.GetAllData();if (data.first != nullptr) {// 獲取讀緩沖區全部有效數據std::string result(reinterpret_cast<char*>(data.first), data.second);// 標記已讀的數據input_buffer_.ReadCompleted(data.second);return result;}return ""; }void TcpConn::HandleRead() {int err = 0;// 調用 MessageBuffer::Recv() 使用 readv() 讀取數據,讀取數據到緩沖區中。int n = input_buffer_.Recv(fd_, &err);if (n > 0 && read_cb_) {read_cb_(); // 觸發讀取回調邏輯} else if (n == 0 || (n < 0 && err != EAGAIN && err != EWOULDBLOCK)) { // 連接關閉或錯誤則關閉連接。Close();} }void TcpConn::HandleWrite() {// 如果寫緩沖區為空,則取消監聽寫事件。if (output_buffer_.empty()) {channel_->DisableWriting();return;}// 緩沖區不為空,調用 send() 發送數據,發送成功則刪除發送緩沖區中的數據。int n = send(fd_, output_buffer_.data(), output_buffer_.size(), MSG_NOSIGNAL);if (n > 0) {output_buffer_.erase(0, n);if (output_buffer_.empty()) {channel_->DisableWriting();}} else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { // 非阻塞錯誤以外的寫失敗則關閉連接Close();} }void TcpConn::Close() {if (closed_) return;closed_ = true;channel_->DisableAll();close(fd_);if (close_cb_) close_cb_(); }
3、完整調用流程
啟動階段:
TcpServer::Start(port, cb)
:
創建
listen_fd
,設為非阻塞創建
accept_channel_
封裝listen_fd
設置其
read_cb
為TcpServer::HandleAccept
將其注冊到
EventLoop
中(AddEvent()
)
有連接到來:
內核通知
listen_fd
可讀 →Channel::HandleEvent()
→read_cb_
→TcpServer::HandleAccept()
:
調用
accept()
,獲得conn_fd
設置為非阻塞
構建
TcpConn
對象,封裝連接的生命周期創建該連接的
Channel
,并注冊其read_cb_
→TcpConn::HandleRead
數據收發流程:
conn_fd
可讀 → epoll 通知 →EventLoop
觸發TcpConn::HandleRead()
:
從 socket 讀入數據到
input_buffer_
調用
read_cb_
讓上層應用邏輯處理數據
TcpConn::Send()
:
若當前 fd 可寫,則直接發送
否則寫入
output_buffer_
,并注冊EPOLLOUT
事件
channel_->SetWriteCallback(...)
設定寫回調
conn_fd
可寫 →TcpConn::HandleWrite()
:
從
output_buffer_
中寫出數據若 buffer 為空,注銷寫事件,調用
writeCompleteCallback