系列文章目錄
第一篇 基于SRS 的 WebRTC 環境搭建
第二篇 基于SRS 實現RTSP接入與WebRTC播放
第三篇 centos下基于ZLMediaKit 的WebRTC 環境搭建
第四篇 WebRTC學習一:獲取音頻和視頻設備
第五篇 WebRTC學習二:WebRTC音視頻數據采集
第六篇 WebRTC學習三:WebRTC音視頻約束
第七篇 WebRTC學習四:WebRTC常規視覺濾鏡
第八篇 WebRTC學習五:從視頻中提取圖片
第九篇 WebRTC學習六:MediaStream 常用API介紹
第十篇 WebRTC學習七:WebRTC 中 STUN 協議詳解
ZLMediaKit源碼分析——[1] 開篇:基礎庫 ZLToolKit 之 onceToken 源碼分析
ZLMediaKit源碼分析——[2] 從 ZLToolKit 代碼看 CPU 親和性設計
ZLMediaKit源碼分析——[3] ZLToolKit 中EventPoller之網絡事件處理
文章目錄
- 系列文章目錄
- 前言
- 一、EventPoller網絡事件處理機制
- 二、EventPoller類圖分析
- 2.1 類圖
- 2.2 相關類功能介紹
- 三、 事件循環驅動 - runLoop 函數
- 四、事件監聽的添加,修改和刪除
- 4.1 添加事件監聽AddEvent
- 4.2 刪除事件監聽 delEvent
- 4.3 修改監聽事件類型 modifyEvent
- 五、數據結構
- 5.1 _event_map
- 5.2 _event_cache_expired的之作用
- 5.2.1 在 delEvent 函數中的作用
- 5.2.2 在 runLoop 函數中的作用
- 5.2.3 總結
- 總結
前言
當我在ZLToolKit里看到事件輪詢管理類EventPoller的runLoop函數時,腦海中瞬間浮現出多年前live555里的設計——那里同樣有一個doEventLoop()函數,在該函數里執行著網絡事件的select操作、異步任務以及延時隊列。出于好奇,我仔細研讀了EventPoller類的代碼。發現如今其IO事件驅動模型采用的是多實例epoll模型,不過網絡事件、異步任務和延時隊列這些概念依舊存在,這樣的設計讓人倍感熟悉。那么,今天我就來詳細剖析一下ZLM里網絡事件處理模型的具體運作機制。
一、EventPoller網絡事件處理機制
zlmediakit基于?事件驅動+非阻塞I/O?架構,事件處理機制如下:
?核心機制?:使用epoll(Linux)或kqueue(BSD)實現I/O多路復用,監控所有socket的可讀/可寫事件。
?執行流程?:
?事件收集?:調用epoll_wait等待socket事件(如新連接、數據到達)。
事件觸發:調用epoll_ctl添加/修改/刪除網絡事件。
事件處理:調用回調函數執行任務。
其核心類為 EventPoller,關鍵函數是 runLoop ,網絡事件處理中關鍵的數據結構為_event_map,_event_cache_expired,以及事件監聽,刪除,修改的接口構成。
二、EventPoller類圖分析
2.1 類圖
因為press on上面顯示類太多了會看不清晰,這里省略了一些和本節介紹無關的類,保留主要類圖如下:
2.2 相關類功能介紹
AnyStorage 類:這是一個通用的數據存儲類,它可以存儲任意類型的數據。在 zlmediakit 中,它提供了一種靈活的方式來存儲不同類型的值,增強了代碼的通用性和可擴展性,方便在不同模塊之間傳遞和管理數據。
ThreadLoadCounter 類:主要用于統計線程的負載情況。它會對線程在運行過程中的各種操作進行計數和時間統計,幫助開發者了解線程的繁忙程度,為系統的性能優化和資源分配提供數據支持。
TaskExecutorInterface 類:作為一個接口類,它定義了任務執行器的基本行為和功能規范。不同的任務執行器可以實現該接口,遵循統一的操作接口,從而實現多態性,方便代碼的擴展和維護。其中最重要的就是它提供的4個接口,是異步任務執行的基礎。
TaskExecutor 類:是具體的任務執行器類,實現了 TaskExecutorInterface 接口所定義的功能。它負責接收并執行各種任務,合理地分配系統資源,將任務分配到合適的線程或執行環境中進行處理。
EventPoller 類:是整個事件處理的核心類,負責事件的輪詢和管理。它能夠監聽網絡套接字的各種事件,通過事件循環不斷檢查事件狀態,并在事件發生時觸發相應的處理邏輯。
三、 事件循環驅動 - runLoop 函數
EventPoller::runLoop 函數是 EventPoller 類的核心函數,它實現了事件循環的邏輯,不斷地監聽網絡套接字的事件并進行處理。
void EventPoller::runLoop(bool blocked, bool ref_self) {if (blocked) {if (ref_self) {s_current_poller = shared_from_this();}_sem_run_started.post();_exit_flag = false;uint64_t minDelay;
#if defined(HAS_EPOLL)struct epoll_event events[EPOLL_SIZE];while (!_exit_flag) {minDelay = getMinDelay();startSleep();//用于統計當前線程負載情況int ret = epoll_wait(_event_fd, events, EPOLL_SIZE, minDelay ? minDelay : -1);sleepWakeUp();//用于統計當前線程負載情況if (ret <= 0) {//超時或被打斷continue;}_event_cache_expired.clear();for (int i = 0; i < ret; ++i) {struct epoll_event &ev = events[i];int fd = ev.data.fd;if (_event_cache_expired.count(fd)) {//event cache refreshcontinue;}auto it = _event_map.find(fd);if (it == _event_map.end()) {epoll_ctl(_event_fd, EPOLL_CTL_DEL, fd, nullptr);continue;}auto cb = it->second;try {(*cb)(toPoller(ev.events)); // 將epoll事件類型轉換回自定事件類型} catch (std::exception &ex) {ErrorL << "Exception occurred when do event task: " << ex.what();}}}
#elif defined(HAS_KQUEUE)// ... 其他系統的處理代碼
#else// ... 其他系統的處理代碼
#endif //HAS_EPOLL} else {_loop_thread = new thread(&EventPoller::runLoop, this, true, ref_self);_sem_run_started.wait();}
}
先屏蔽掉其它系統的網絡處理,可以看到該函數處理主要流程如下:
1、初始化和循環條件判斷:首先進行一些初始化操作,如設置當前的 EventPoller 實例,將退出標志 _exit_flag 置為 false,表示開始事件循環。
2、獲取最小延遲時間:調用 getMinDelay() 函數獲取最小延遲時間,用于 epoll_wait 的超時設置。
3、調用 epoll_wait 監聽事件:使用 epoll_wait 函數監聽網絡事件,當有事件發生時,該函數會返回發生事件的數量。
4、事件處理:遍歷發生事件的數組,對于每個事件,檢查其對應的文件描述符是否在 _event_map 中。如果存在,則調用相應的回調函數進行處理;如果不存在,則從 epoll 中刪除該文件描述符。
四、事件監聽的添加,修改和刪除
4.1 添加事件監聽AddEvent
int EventPoller::addEvent(int fd, int event, PollEventCB cb) {// 時間檢查TimeTicker();// 回調函數檢查if (!cb) {WarnL << "PollEventCB is empty";return -1;}// 當前線程檢查,// 如果是當前線程:根據不同的操作系統平臺選擇不同的事件通知機制來添加事件監聽。if (isCurrentThread()) {
#if defined(HAS_EPOLL)struct epoll_event ev = {0};ev.events = toEpoll(event) ;ev.data.fd = fd;int ret = epoll_ctl(_event_fd, EPOLL_CTL_ADD, fd, &ev);if (ret != -1) {_event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb)));}return ret;
#elif defined(HAS_KQUEUE)struct kevent kev[2];int index = 0;if (event & Event_Read) {EV_SET(&kev[index++], fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, nullptr);}if (event & Event_Write) {EV_SET(&kev[index++], fd, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, nullptr);}int ret = kevent(_event_fd, kev, index, nullptr, 0, nullptr);if (ret != -1) {_event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb)));}return ret;
#else
#ifndef _WIN32// win32平臺,socket套接字不等于文件描述符,所以可能不適用這個限制if (fd >= FD_SETSIZE) {WarnL << "select() can not watch fd bigger than " << FD_SETSIZE;return -1;}
#endifauto record = std::make_shared<Poll_Record>();record->fd = fd;record->event = event;record->call_back = std::move(cb);_event_map.emplace(fd, record);return 0;
#endif}// 如果不是當前線程,發起異步操作:// 使用 async 函數異步調用 addEvent 函數,將事件添加操作放到合適的線程中執行,并立即返回 0。async([this, fd, event, cb]() mutable {addEvent(fd, event, std::move(cb));});return 0;
}
EventPoller::addEvent 函數的主要功能是向事件輪詢器中添加一個文件描述符(fd)的事件監聽,并關聯一個回調函數(cb)。該函數會根據不同的操作系統平臺,采用不同的事件通知機制(如 epoll、kqueue 或 select)來添加事件監聽,同時會處理回調函數為空以及非當前線程調用的情況。
4.2 刪除事件監聽 delEvent
int EventPoller::delEvent(int fd, PollCompleteCB cb) {// 時間檢查TimeTicker();// 回調函數檢查if (!cb) {cb = [](bool success) {};}// 當前線程檢查// 如果是當前線程:根據不同的操作系統平臺選擇不同的事件通知機制來刪除事件監聽。if (isCurrentThread()) {
#if defined(HAS_EPOLL)int ret = -1;if (_event_map.erase(fd)) {_event_cache_expired.emplace(fd);ret = epoll_ctl(_event_fd, EPOLL_CTL_DEL, fd, nullptr);}cb(ret != -1);return ret;
#elif defined(HAS_KQUEUE)int ret = -1;if (_event_map.erase(fd)) {_event_cache_expired.emplace(fd);struct kevent kev[2];int index = 0;EV_SET(&kev[index++], fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);EV_SET(&kev[index++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);ret = kevent(_event_fd, kev, index, nullptr, 0, nullptr);}cb(ret != -1);return ret;
#elseint ret = -1;if (_event_map.erase(fd)) {_event_cache_expired.emplace(fd);ret = 0;}cb(ret != -1);return ret;
#endif //HAS_EPOLL}// 跨線程操作// 使用 async 函數異步調用 delEvent 函數,將事件刪除操作放到合適的線程中執行,并立即返回 0async([this, fd, cb]() mutable {delEvent(fd, std::move(cb));});return 0;
}
EventPoller::delEvent 函數的主要功能是從事件輪詢器中刪除指定文件描述符(fd)的事件監聽,并在操作完成后調用一個完成回調函數(cb)。該函數會根據不同的操作系統平臺,采用不同的事件通知機制(如 epoll、kqueue)來刪除事件監聽,同時會處理回調函數為空以及非當前線程調用的情況。
4.3 修改監聽事件類型 modifyEvent
int EventPoller::modifyEvent(int fd, int event, PollCompleteCB cb) {// 時間檢查TimeTicker();// 回調函數檢查與默認設置if (!cb) {cb = [](bool success) {};}// 當前線程檢查// 如果是當前線程:根據不同的操作系統平臺選擇不同的事件通知機制來修改事件監聽。if (isCurrentThread()) {
#if defined(HAS_EPOLL)struct epoll_event ev = { 0 };ev.events = toEpoll(event);ev.data.fd = fd;auto ret = epoll_ctl(_event_fd, EPOLL_CTL_MOD, fd, &ev);cb(ret != -1);return ret;
#elif defined(HAS_KQUEUE)struct kevent kev[2];int index = 0;EV_SET(&kev[index++], fd, EVFILT_READ, event & Event_Read ? EV_ADD | EV_CLEAR : EV_DELETE, 0, 0, nullptr);EV_SET(&kev[index++], fd, EVFILT_WRITE, event & Event_Write ? EV_ADD | EV_CLEAR : EV_DELETE, 0, 0, nullptr);int ret = kevent(_event_fd, kev, index, nullptr, 0, nullptr);cb(ret != -1);return ret;
#elseauto it = _event_map.find(fd);if (it != _event_map.end()) {it->second->event = event;}cb(it != _event_map.end());return it != _event_map.end() ? 0 : -1;
#endif // HAS_EPOLL}// 如果不是當前線程, 發起異步操作:// 使用 async 函數異步調用 modifyEvent 函數,將事件修改操作放到合適的線程中執行。async([this, fd, event, cb]() mutable {modifyEvent(fd, event, std::move(cb));});return 0;
}
EventPoller::modifyEvent 函數的主要功能是修改指定文件描述符(fd)對應的事件監聽設置,并在操作完成后調用一個完成回調函數(cb)。該函數會根據不同的操作系統平臺,采用不同的事件通知機制(如 epoll、kqueue)來修改事件監聽,同時會處理回調函數為空以及非當前線程調用的情況。
五、數據結構
5.1 _event_map
在 EventPoller 類中,有一個重要的數據結構 std::unordered_map<int, std::shared_ptr > _event_map,它用于存儲網絡事件的相關信息。
鍵(Key):為文件描述符(int 類型),每個文件描述符對應一個網絡套接字。
值(Value):為 std::shared_ptr 類型,它是一個智能指針,指向一個事件處理回調函數。當該文件描述符對應的網絡事件發生時,會調用該回調函數進行處理。
通過這個數據結構,EventPoller 能夠快速地查找和處理不同文件描述符的網絡事件,提高事件處理的效率。
5.2 _event_cache_expired的之作用
在EventPoller 類中,_event_cache_expired完整聲明為std::unordered_set _event_cache_expired;
5.2.1 在 delEvent 函數中的作用
int EventPoller::delEvent(int fd, PollCompleteCB cb) {// ...if (_event_map.erase(fd)) {_event_cache_expired.emplace(fd);// ...}// ...
}
標記待刪除的文件描述符:當調用 delEvent 函數刪除一個文件描述符對應的事件時,如果該文件描述符存在于 _event_map 中(即 _event_map.erase(fd) 返回 true),會將這個文件描述符添加到 _event_cache_expired 集合里。這樣做是為了在后續的事件處理循環中標記這個文件描述符已經被刪除,以便后續可以跳過對它的處理。
避免誤處理已刪除的事件:在多線程環境下,刪除事件和處理事件可能會并發執行。將待刪除的文件描述符添加到 _event_cache_expired 中,可以確保即使在刪除操作還未完全完成時,事件處理循環也能識別并跳過這些文件描述符,避免對已刪除的事件進行誤處理。
5.2.2 在 runLoop 函數中的作用
void EventPoller::runLoop(bool blocked, bool ref_self) {// ..._event_cache_expired.clear();for (int i = 0; i < ret; ++i) {struct epoll_event &ev = events[i];int fd = ev.data.fd;if (_event_cache_expired.count(fd)) {continue;}// ...}// ...
}
清空過期事件緩存:在每次 epoll_wait 調用返回且有事件發生時,首先會調用 _event_cache_expired.clear() 清空集合。這是因為每次新的事件輪詢周期開始時,需要重新判斷哪些文件描述符是需要跳過的,避免上一輪的過期標記影響當前輪次的事件處理。
過濾已標記的文件描述符:在遍歷 epoll_wait 返回的事件數組時,對于每個事件對應的文件描述符 fd,會檢查它是否存在于 _event_cache_expired 集合中。如果存在,說明這個文件描述符已經在 delEvent 函數中被標記為待刪除或需要跳過,此時會直接跳過對該事件的處理,繼續處理下一個事件,從而提高事件處理的效率和準確性。
5.2.3 總結
_event_cache_expired 集合在整個事件處理機制中起到了一個中間標記的作用,它在 delEvent 函數中標記需要刪除或跳過的文件描述符,在 runLoop 函數中過濾這些標記過的文件描述符,避免重復處理,確保事件處理的正確性和高效性,尤其在多線程環境下可以有效避免并發操作帶來的問題。
總結
本文深入分析了開源 zlmediakit 中針對網絡套接字的事件監聽和事件循環驅動機制。通過對 EventPoller 類和其基類的詳細剖析,我們了解了其核心函數 runLoop 的工作原理,以及相關函數和數據結構的作用。在 Linux 系統下,epoll 機制的使用使得網絡事件的處理更加高效和靈活。同時,AddEvent()、ModifyEvent() 和 delEvent() 函數提供了對網絡事件監聽的添加、修改和刪除操作,方便開發者根據業務需求進行動態調整。_event_map 數據結構則為網絡事件的管理提供了有效的支持。_event_cache_expired 作為 std::unordered_set 類型變量,在事件處理流程中起關鍵中間標記作用,在 delEvent 函數里標記待刪除文件描述符以避免多線程下誤處理已刪事件,在 runLoop 函數中先清空集合再過濾標記過的描述符來提升事件處理效率與準確性。
通過對這些機制的理解,開發者可以更好地利用 zlmediakit 進行網絡編程,實現高效、穩定的網絡通信。后續我們還將對異步事件和延時任務進行專門的分析,敬請期待。