Poller抽象層代碼
Muduo 網絡庫中的 Poller 抽象層是其事件驅動模型的核心組件之一,負責統一封裝不同 I/O 復用機制(如 epoll、poll),實現事件監聽與分發。
Poller 抽象層的作用
- 統一 I/O 復用接口
- Poller 作為抽象基類,定義了 poll()、updateChannel()、removeChannel() 等純虛函數,強制派生類(如 EPollPoller)實現這些接口。這使得上層模塊(如 EventLoop)無需關心底層具體使用 epoll 還是 poll,只需通過基類指針操作,實現了 “多路分發器” 的功能?
- 解耦事件循環與 I/O 機制
- EventLoop 依賴 Poller 監聽文件描述符(fd)事件,但通過抽象層隔離了具體 I/O 復用機制的實現細節。例如,EventLoop::loop() 調用 Poller::poll() 等待事件,而無需知曉底層是 epoll_wait 還是 poll 調用?
頭文件
#pragma once
#include"noncopyable.h"
#include"Timestamp.h"
#include<vector>
#include<unordered_map>
class Channel;
class Eventloop;class Poller:noncopyable
{public:using ChannelList=std::vector<Channel*>;Poller(Eventloop *loop);virtual ~Poller()=default;//給所有I/O復用保留統一的接口virtual Timestamp poll(int timeoutMs,ChannelList *activeChannels)=0;virtual void updateChannel(Channel *channel)=0;virtual void removeChannel(Channel *channnel)=0; //判斷參數channel是否在當前Poller當中bool hasChannel(Channel *channel) const;//事件循環可以通過當前接口獲取默認的I/O復用的具體實現static Poller* newDefaultPoller(Eventloop *loop);protected://這里map的key就是sockfd value就是sockfd所屬的通道類型channelusing ChannelMap=std::unordered_map<int,Channel*>;ChannelMap Channels_;private:Eventloop *ownerLoop_;//定義poller所屬的事件循環EventLoop
};
1、存放有事件發生的 Channel
using ChannelList = std::vector<Channel*>;
poll 函數會把有事件發生的 Channel?指針填充到這個列表中。
Poller能監聽到哪些channel發生事件了,獲取活躍事件,然后遍歷處理,然后上報給Eventloop,然后通知channel處理相應的事件
2、構造函數
Poller(EventLoop *loop);
初始化必須知道所屬 EventLoop,
這是在建立 Poller→EventLoop 的反向指針
由 EventLoop 在初始化時創建(EventLoop 持有 Poller 實例)
3、事件輪循
virtual Timestamp poll(int timeoutMs, ChannelList *activeChannels) = 0;
定義了一個跨平臺 I/O 多路復用的抽象接口,派生類必須重寫用它來實現 epoll、poll 等系統調用,上層EventLoop 通過它等待事件并分發處理,返回事件發生的精確時間戳
4、更新/添加通道監控
virtual void updateChannel(Channel *channel) = 0;
當 Channel 的事件關注集變化時(如新增寫監聽)
調用底層 epoll_ctl/poll 更新監聽狀態
5、移除通道監控
virtual void removeChannel(Channel *channel) = 0;
當 Channel 銷毀時(如連接關閉)
- 清理 Poller 內部資源
- 由 Channel::remove() 調用
- 清除 channels_ 映射關系
- 調用底層 epoll_ctl/poll 移除監聽
6、判斷參數channel是否在當前Poller當中
bool hasChannel(Channel *channel) const;
防止重復注冊同一個 Channel 到 Poller 中,避免重復添加導致的崩潰或未定義行為。
7、默認I/O的復用的具體實現
通過該接口,可以獲取到想要的具體的IO復用實例,是Epoll、Poll、還是select
static Poller* newDefaultPoller(EventLoop *loop);
這里又學到一招,這個接口的實現要放在一個單獨的源文件(DefaultPoller.cc)中,因為這里需要 new 出來一個EpollPoller實例,如果放在(Poller.cc)中實現,就會出現基類包含子類頭文件的情況。(Poller是基類,EPollPoller是子類)。
8、維護 fd → Channel 的映射
using ChannelMap = std::unordered_map<int, Channel*>;
ChannelMap Channels_;
這里map的key就是sockfd value就是sockfd所屬的通道類型channel
快速通過 fd 查找 Channel,存儲所有被管理的 Channel,避免重復管理同一個 fd.
9、poller所屬的事件循環EventLoop
EventLoop *ownerLoop_;
反向指針 - 指向所屬 EventLoop
源文件Poller.cc
#include"Poller.h"
#include"Channel.h"Poller::Poller(Eventloop *loop):ownerLoop_(loop){}bool Poller::hasChannel(Channel *channel) const
{auto it=Channels_.find(channel->fd());return it!=Channels_.end() && it->second==channel;
}
channel->fd():獲取 Channel 底層的文件描述符
Channels_.find():在 unordered_map<int, Channel*> 中 O(1) 復雜度查找
源文件DefaultPoller.cc
//細節性設計,為了不讓基類包含子類的頭文件
#include"Poller.h"
#include"EPollPoller.h"
#include<stdlib.h>
Poller* Poller::newDefaultPoller(Eventloop *loop)
{if(::getenv("MUDUO_USE_POLL"))//獲取環境變量,如果有則生成poll實例,(本代碼沒有實現){return nullptr;//生成poll的實例}else{return new EPollPoller(loop);//生成epoll的實例}
}
EPollPoller
接下來這個模塊用來封裝Linux 的 epoll I/O 多路復用機制
頭文件
#pragma once
#include"Poller.h"
#include<vector>
#include<sys/epoll.h>
#include"Timestamp.h"class EPollPoller:public Poller{public:EPollPoller(Eventloop* loop);~EPollPoller() override;//重寫基類Poller的方法Timestamp poll(int timeoutMs,ChannelList *activeChannels) override;void updateChannel(Channel *channel) override;void removeChannel(Channel *channel) override;
private:static const int kInitEventListSize=16;//給定的vector的一個初始長度默認是16,可以面試證明自己看過源碼,為啥是靜態//填寫活躍的連接void fillActiveChannels(int numEvents,ChannelList* activeChannels) const;//更新channel通道void update(int operation ,Channel* channel);using EventList=std::vector<epoll_event>;int epollfd_;EventList events_;//這個將來是作為epoll_wait的第二個參數};
成員變量
static const int kInitEventListSize=16;//Muduo庫默認給定的vector的一個初始長度默認是16
using EventList=std::vector<epoll_event>;
int epollfd_;
EventList events_;//這個將來是作為epoll_wait的第二個參數
定義一個數組,用于存儲epoll_wait返回的就緒事件表,使用vector動態擴展事件緩沖區(當就緒事件數 == 緩沖區大小時自動擴容)
成員函數
1、重寫Poller的方法
Timestamp poll(int timeoutMs,ChannelList *activeChannels) override;
void updateChannel(Channel *channel) override;
void removeChannel(Channel *channel) override;
這是在Poller類中聲明的純虛函數,繼承Poller之后對這些函數必須進行重寫,
- poll()函數:等待并獲取當前有事件發生的 Channel 列表。調用 epoll_wait() 等待事件。將“就緒的事件”轉換為 Channel* 指針,填充到 activeChannels 中。
- updateChannel():將一個 Channel 注冊或修改到 epoll 中(對應 epoll_ctl(ADD/MOD))。當Channel 的事件發生變化(如從不監聽可讀 → 監聽可讀),需要通知 EPollPoller。根據 Channel 的狀態決定是 EPOLL_CTL_ADD 還是 EPOLL_CTL_MOD。
- removeChannel():從 epoll 中移除一個 Channel(對應 epoll_ctl(DEL)),從channels_映射表中刪除,并清理內部狀態。當 Channel 對應的資源(如 socket)關閉時,必須從 epoll 中注銷。防止后續 epoll_wait 返回無效的 Channel。
2、填寫活躍的連接
void fillActiveChannels(int numEvents,ChannelList* activeChannels) const;
- 遍歷就緒事件
- 設置Channel的revents_
- 填充到activeChannels列表
3、更新channel通道
void update(int operation ,Channel *channel);
執行具體的epoll_ctl操作
源文件
#include"EPollPoller.h"
#include"Logger.h"
#include<unistd.h>
#include<string.h>
#include"Channel.h"
//#include<errno.h>
//表示channel是否添加到poller,與channel的成員index_相關,它的初始值就是-1,表示還沒有添加到里面
//用來區分,如果再次添加channel時候發現是kAdded表示已經添加到里面了,那這次操作就是更新channel中的事件
const int kNew=-1;//一個channel還沒有添加到poller里邊
const int kAdded=1;//一個channel已經添加到poller里面嗎
const int kDeleted=2;//刪除EPollPoller::EPollPoller(Eventloop *loop):Poller(loop),epollfd_(::epoll_create1(EPOLL_CLOEXEC))//EPOLL_CLOEXEC:exec 時自動關閉文件描述符,安全防護,events_(kInitEventListSize)
{if(epollfd_<0){LOG_FATAL("epoll_create error:%d \n",errno);}
}
EPollPoller::~EPollPoller()
{::close(epollfd_);
}
//返回具體發生事件的時間點,主要調用的是epoll_wait
Timestamp EPollPoller::poll(int timeoutMs,ChannelList *activeChannels)
{//poll這里短時間是接受大量的高并發請求的,如果在這里使用LOG_INFO則每次調用都會影響它的性能//實際上這里應該用LOG_DEBUG,只要我們不定義debug,他就不用調用進而不會影響它的性能
//記錄當前系統中“有多少個連接正在被監聽”。LOG_INFO("func=%s => fd total count:%lu \n",__FUNCTION__,Channels_.size());//第二個參數是存放發生事件的數組地址,但是為了方便擴容,//我們用的是vector.events_.size()返回的是size_t類型,但是這里參數要求int,所以使用轉換int numEvents=::epoll_wait(epollfd_,&*events_.begin(),static_cast<int>(events_.size()),timeoutMs);int saveErrno=errno;Timestamp now(Timestamp::now());if(numEvents>0){LOG_INFO("%d events happend \n",numEvents);fillActiveChannels(numEvents,activeChannels);//擴容if(numEvents==events_.size()){events_.resize(events_.size()*2);}}else if (numEvents==0){LOG_INFO("%s timeout! \n",__FUNCTION__);}else//外部中斷,還要接著執行業務邏輯{if(saveErrno != EINTR){errno=saveErrno;//不太懂LOG_ERROR("EPollPoller::poll() err!");}}return now;
}
void EPollPoller::updateChannel(Channel *channel)
{const int index=channel->index();LOG_INFO("func=%s => fd=%d events=%d index=%d \n",__FUNCTION__,channel->fd(),channel->events(),index);if(index==kNew || index==kDeleted){if(index==kNew){int fd=channel->fd();Channels_[fd]=channel;}channel->set_index(kAdded);update(EPOLL_CTL_ADD,channel);}else//此時channel已經在poller上注冊過了{int fd=channel->fd();if(channel->isNoneEvent()){update(EPOLL_CTL_DEL,channel);}else{update(EPOLL_CTL_MOD,channel);}}
}
//從poller中刪除channel
void EPollPoller::removeChannel(Channel *channel)
{int fd=channel->fd();Channels_.erase(fd);LOG_INFO("func=%s => fd=%d\n",__FUNCTION__,fd);int index=channel->index();if(index==kAdded){update(EPOLL_CTL_DEL,channel);}channel->set_index(kNew);
}//填寫活躍的連接
void EPollPoller::fillActiveChannels(int numEvents,ChannelList* activeChannels) const
{for(int i=0;i< numEvents;++i){Channel* channel=static_cast<Channel*>(events_[i].data.ptr);channel->set_revents(events_[i].events);activeChannels->push_back(channel);//Eventloop就拿到了poller給它返回的所有發生事件的channel列表了}
}
//更新channel通道,做的是事件的增刪改
void EPollPoller::update(int operation ,Channel* channel)
{epoll_event event;memset(&event,0,sizeof(event));int fd=channel->fd();event.events=channel->events();event.data.fd=fd;event.data.ptr=channel;if(::epoll_ctl(epollfd_,operation,fd,&event)<0){if(operation==EPOLL_CTL_DEL){LOG_ERROR("epoll_ctl del error:%d\n",errno);}else{LOG_FATAL("epoll_ctl add/mod error%d \n",errno);}}
}
1、Channel 狀態常量定義
const int kNew = -1; // Channel 未添加到 Poller
const int kAdded = 1; // Channel 已添加到 Poller
const int kDeleted = 2; // Channel 已從 Poller 移除
//kDeleted 狀態允許 Channel 暫時移除但保留在映射表中,避免頻繁添加/刪除的開銷
定義 Channel 在 Poller 中的生命周期狀態。確保操作的正確順序(不能重復添加已添加的 Channel)。避免不必要的系統調用(如對已刪除 Channel 再次刪除)。
2、poll方法重寫
等待 I/O 事件發生,并將“活躍的 Channel”填充到 activeChannels 中
Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels) {LOG_INFO("func=%s => fd total count:%lu \n", __FUNCTION__, Channels_.size());int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);int saveErrno = errno; // 保存系統錯誤碼Timestamp now(Timestamp::now());if(numEvents > 0) {LOG_INFO("%d events happend \n", numEvents);fillActiveChannels(numEvents, activeChannels);// 動態擴容if(numEvents == events_.size()) {events_.resize(events_.size() * 2);}}else if (numEvents == 0) {LOG_INFO("%s timeout! \n", __FUNCTION__);}else { // 錯誤處理if(saveErrno != EINTR) { // 非中斷錯誤errno = saveErrno;LOG_ERROR("EPollPoller::poll() err!");}}return now;
}
2.1?
int numEvents=::epoll_wait(epollfd_,&*events_.begin(),static_cast<int>(events_.size()),timeoutMs);
這里的第二個參數:
- events_.begin() → 返回一個 iterator,指向第一個元素。
- *events_.begin() → 解引用 iterator,得到第一個 epoll_event 對象。
- &*events_.begin() → 取第一個對象的地址,即 struct epoll_event*。
第三個參數
- 我們用的是vector.events_.size()返回的是size_t類型,但是這里參數要求int,所以使用轉換
2.2
int saveErrno=errno;
epoll_wait 返回 -1 時,錯誤原因在 errno 中。但后續代碼可能調用其他函數(如 LOG_INFO),會覆蓋 errno。所以必須立即保存,否則錯誤信息丟失。
這是 Linux 系統編程的 黃金法則:一旦系統調用失敗,立刻保存 errno。
2.3
else//外部中斷,還要接著執行業務邏輯{if(saveErrno != EINTR){errno=saveErrno;//雖然日志不需要了,但有些設計會保留 errno 給上層處理。LOG_ERROR("EPollPoller::poll() err!");}}
EINTR:系統調用被信號中斷(如 SIGCHLD、SIGTERM)。這是正常情況,不應視為錯誤,循環會繼續。如果是其他錯誤(如 EBADF、ENOMEM)才是真正的異常。
3、updateChannel重寫
狀態驅動 —— 通過 channel->index() 的狀態決定行為,即根據 Channel 的當前狀態,決定是 epoll_ctl(ADD)、MOD 還是 DEL。
void EPollPoller::updateChannel(Channel *channel)
{const int index=channel->index();//獲取狀態,是未注冊到Poller、已注冊還是已刪除LOG_INFO("func=%s => fd=%d events=%d index=%d \n",__FUNCTION__,channel->fd(),channel->events(),index);if(index==kNew || index==kDeleted){if(index==kNew){int fd=channel->fd();Channels_[fd]=channel;}channel->set_index(kAdded);update(EPOLL_CTL_ADD,channel);}else//此時channel已經在poller上注冊過了{int fd=channel->fd();if(channel->isNoneEvent()){update(EPOLL_CTL_DEL,channel);}else{update(EPOLL_CTL_MOD,channel);}}
}
如果是kNew(新連接,首次注冊)或者是 kDeleted(之前被移除,現在重新啟用:比如連接復用、延遲刪除后恢復)。無論之前是否注冊過,只要不是 kAdded,就當作“新來”的處理。
僅在 kNew 時加入 channels_ 映射表
channels_[fd] = channel:建立 fd → Channel 的映射,用于 epoll_wait 返回后查找 Channel。
而遵循?kDeleted 狀態允許 Channel 暫時移除但保留在映射表中,避免頻繁添加/刪除的開銷。
所以不需要將channel再次添加到Channels_中了。
對于已經注冊在Poller上的channel,如果他沒有感興趣的事,就從 epoll 實例中刪除。
3、update()
封裝 epoll_ctl 系統調用,用于向 epoll 實例添加、修改或刪除一個 fd 的事件監聽。
void EPollPoller::update(int operation ,Channel* channel)
{epoll_event event;memset(&event,0,sizeof(event));int fd=channel->fd();event.events=channel->events();event.data.fd=fd;event.data.ptr=channel;//關鍵:存儲 Channel* 用于事件分發if(::epoll_ctl(epollfd_,operation,fd,&event)<0){if(operation==EPOLL_CTL_DEL){LOG_ERROR("epoll_ctl del error:%d\n",errno);}else{LOG_FATAL("epoll_ctl add/mod error%d \n",errno);}}
}
epoll_event 是 epoll 的核心數據結構,用于描述一個 fd 的監聽事件和用戶數據。
event.data.ptr=channel::最關鍵字段!綁定用戶數據,后面用于 fillActiveChannels()?中直接取出 Channel*
4、removeChannel重寫
void EPollPoller::removeChannel(Channel *channel) {int fd = channel->fd();Channels_.erase(fd); // 從映射表移除LOG_INFO("func=%s => fd=%d\n", __FUNCTION__, fd);int index = channel->index();if(index == kAdded) {update(EPOLL_CTL_DEL, channel); // 從epoll移除}channel->set_index(kNew); // 重置狀態
}
5、填充活躍的連接
void EPollPoller::fillActiveChannels(int numEvents,ChannelList* activeChannels) const
{for(int i=0;i< numEvents;++i){Channel* channel=static_cast<Channel*>(events_[i].data.ptr);channel->set_revents(events_[i].events);activeChannels->push_back(channel);//Eventloop就拿到了poller給它返回的所有發生事件的channel列表了}
}
numEvents:來自 epoll_wait 的返回值,表示有多少個 fd 就緒。遍歷 events_[i] 數組中的每一個就緒事件。
接著從 epoll_event.data.ptr 取出 Channel*,上面update()中設置過
event.data.ptr = channel;
epoll 本身不關心用戶數據,只返回 fd 和 events。但我們需要知道“哪個 Channel 對應這個 fd”。
通過 data.ptr 直接綁定 Channel*,實現了 O(1) 的快速查找,避免了 channels_[fd] 的哈希查找。
利用了空間換時間。
events_ 是 EPollPoller 的成員變量:std::vector<epoll_event> events_;接著設置實際發生的事件。
接著將活躍的channel添加到活躍列表activeChannels中,activeChannels 是一個輸出參數,類型為 std::vector<Channel*>*。這個列表會被返回給 EventLoop,后續遍歷并調用 channel->handleEvent()。
感謝閱讀!