目錄
Reactor
概念
分類
單Reactor單線程
單Reactor多線程
多Reactor多線程
項目介紹?
項目規劃
模塊關系
實現?
TimerWheel -- 時間輪定時器
定時器系統調用
時間輪設計
通用類型Any?
Buffer
Socket
Channel
Poller
EventLoop(核心)
eventfd?
設計思路
Connection
Accept
LoopThread
LoopThreadPool
TcpServer
EchoServer
測試
性能測試
HTTPServer
Reactor
概念
Reactor 模式是一種事件驅動的設計模式,廣泛應用于高并發、異步I/O場景下的系統設計。它通過將請求分發到適當的處理程序來管理輸入/輸出操作,并使得單線程能夠高效地處理大量的連接。
核心組件
Reactor (反應器)
- 負責監聽并接收所有類型的事件(如讀就緒、寫就緒等),然后將其分派給相應的處理器進行處理。它是整個模式的核心協調者。
Handlers (事件處理器)
- 每種具體的事件都有對應的Handler負責實際業務邏輯的執行。例如,“客戶端數據到達”的事件由專門的數據讀取Handler完成任務;而“數據發送完畢”則交由另一組Handler去清理資源或做其他后續工作。?
Demultiplexer (多路復用器)
- 這是一個抽象的概念,在操作系統層面通常指代像select(), poll()?或 Linux 特有的 epoll()?等 API 。它的作用是從眾多文件描述符里篩選出那些當前已經準備好可以進行 I/O 操作的對象集合交給上層應用繼續加工。
工作流程
當一個新事件發生時,比如有新的網絡連接進來或者是某個套接字變得可讀了:
- Demultiplexer會檢測到這個變化并將相關信息通知給Reactor;
- Reactor根據預先設定好的規則找到匹配此類型事件的那個特定handler實例;
- 最終把控制權轉移過去讓那個 handler 執行其職責范圍內的事務。
這種架構非常適用于需要同時監控大量獨立來源的情況之下,因為它避免了大量的阻塞等待時間浪費掉CPU周期數目的情況出現。
分類
單Reactor單線程
一個單獨的工作線程配合一個反應器(Reactor),管理所有客戶端連接和請求。
- 通過IO多路復?模型進?客?端請求監控
- 觸發事件后,進?事件處理
- 如果是新建連接請求,則獲取新建連接,并添加?多路復?模型進?事件監控。
- 如果是數據通信請求,則進?對應數據處理(接收數據,處理數據,發送響應)
在這個模型里,所有的I/O操作都是非阻塞(non-blocking)形式完成,并依賴于底層操作系統提供的事件通知功能(如select、poll或epoll等,我們使用的是epoll),當某個套接字(socket)準備就緒時會觸發對應的讀取(read) / 寫入(write)回調函數來實際處理數據交換任務。由于只存在唯一的一條執行路徑負責接收新來的鏈接以及服務于現存活躍連接上的交互動作。
優點:該架構相對簡單易于理解和維護,同時也能保證較高的性能水平,特別是在高負載條件下表現尤為突出,因為避免了頻繁創建銷毀線程帶來的開銷,也無需考慮線程安全的問題。
缺點:在面對計算密集型業務場景下它的劣勢便顯現出來了——如果當前正在運行的任務耗用了過多CPU資源,則可能會導致其他等待服務隊列中的項目延遲響應時間過長,并且無法及時獲取新連接,用戶體驗差;此外對于多核處理器環境而言無法充分利用硬件并行運算能力也是其固有缺陷之一。
單Reactor多線程
- Reactor線程通過I/O多路復?模型進?客?端請求監控
- 觸發事件后,進?事件處理
- 如果是新建連接請求,則獲取新建連接,并添加?多路復?模型進?事件監控。
- 如果是數據通信請求,則接收數據后分發給Worker線程池進?業務處理。
- ?作線程處理完畢后,將響應交給Reactor線程進?數據響應
這是最基礎的一種形式,主線程負責監聽所有的事件源(如Socket連接),當有事件到達時將其分發到預先創建好的一組工作者線程池中去實際處理業務邏輯。
優點:這種方式能夠充分利用現代CPU多核優勢,并行化地完成復雜的計算任務。
缺點:在高并發場景下,單一的Reactor即使不進行業務處理,但是也可能來不及處理大量新連接,導致延遲增加。雖然IO操作是線程池處理的,但是單Reactor處理能力受限于主線程,當有大量的事件需要分發時,主線程可能無法及時處理,導致事件積壓。這時候線程池的線程可能都處于空閑狀態,而主線程卻忙不過來,系統整體的吞吐量就被限制了。
另外,線程之間的切換開銷也是一個問題。雖然工作線程池可以處理多個任務,但線程數量增多的話,上下文切換的成本也會上升,尤其是在高并發場景下,頻繁的線程切換可能導致CPU資源被大量消耗,反而降低了效率。
多Reactor多線程
- 在主Reactor中處理新連接請求事件,有新連接到來則分發到?Reactor中監控
- 在?Reactor中進?客?端通信監控,有事件觸發,則接收數據分發給Worker線程池
- Worker線程池分配獨?的線程進?具體的業務處理
- ?作線程處理完畢后,將響應交給?Reactor線程進?數據響應
相較于第一種結構,此版本引入了專門用于接收新請求的“Acceptor”角色以及若干個獨立運行的子級Reactors實例。每個次級單元各自維護著一部分已經建立起來的會話鏈接并對其發生的各類操作做出反應;同時為了均衡負載壓力還可能涉及到動態調整分配比例等策略優化措施。這種分離機制不僅增強了可伸縮性和穩定性而且也便于模塊化的管理與擴展升級等工作開展實施。
注意:
- 并不意味著線程越多,服務器效率越高,因為線程越多,CPU線程上下文的切換就會越多,鎖資源競爭越激烈,串行多了,并發效率降低,并且CPU會因為大量的線程切換反而導致整體效率降低
- 所以有些設計是將業務線程去除,融合到從屬的Reactor的IO事件監控線程中,即從屬的Reactor需要IO事件監控、IO操作、業務處理三個任務
項目介紹?
本項目中,采用的是多Reactor多線程模式,一個Reactor對應一個線程,主Reactor只負責新連接的獲取,多個從屬Reactor進行連接的IO事件監控與業務處理,沒有采取線程池處理IO事件,主要是考慮線程數量過多反而導致CPU計算效率降低,頻繁的上下文切換。
項目規劃
整個項目劃分為兩個大模塊:
- Server模塊:實現基于事件驅動的主從Reactor模型的TCP服務器。
- 協議模塊:對當前的Reactor服務器提供應用層協議支持
其中Server模塊最關鍵,在這個模塊中,主要是要對于所有的鏈接以及線程進行管理,管理的主要內容有下面的三個方面
- 監聽連接管理:有新的連接到來時,要對于新的連接情況進行管理
- 通信連接管理:對于通信連接要進行管理
- 超時連接管理:當有連接處于超時狀態時,要對于這些已經超時的連接進行管理
所以基于上面的這三個主要的管理方式,又可以構建出下面的一些更加細致化的模塊
Buffer模塊:緩沖區模塊。
功能:用于實現C/S通信中服務端的用戶態的接收緩沖區和發送緩沖區功能。
意義:在實際的TCP服務器進行通信的時候,有可能會出現讀取上來的數據并不是一個完整的報文,在發送的時候可能TCP的發送緩沖區已滿,所以要對于這些沒有完全就緒的數據進行一個暫時的緩存管理的過程,所以要單獨設計出一個Buffer模塊,主要的功能就是要實現通信套接字的用戶態緩沖區
Socket模塊:套接字模塊
功能:對套接字操作做封裝的模塊,有socket、bind、listen、connect、accept、recv、send、創建一個服務端連接、創建一個客戶端連接等接口。
意義:使得程序中對套接字的各項操作更加簡便。
Channel模塊
功能:對一個描述符進行IO事件監控管理的模塊,實現對描述符可讀、可寫、錯誤、等事件的管理操作,當事件就緒后,對就緒的監控事件的處理也是由Channel模塊來負責的。
意義:對于描述符的監控事件在用戶態更容易維護,以及觸發事件后的操作流程更加簡便(事件就緒后直接調用Channel類的HandlerEvent函數即可)
Poller模塊
功能:對epoll系統調用做封裝的模塊,對外提供對epoll的IO事件監控的添加、修改、移除、獲取就緒事件的接口。
意義:使我們在對描述符進行事件監控的操作更加簡單。
TimerWheel模塊:定時器模塊
功能:向定時器添加一個任務后,該任務會在指定的時間后被執行,同時也可以刷新定時任務來延遲任務的執行事件。
設計:我們采用時間輪思想,即創建一個60個元素大小的數組(表示【0, 59】秒),并定義一個秒針變量(下標初始指向0),該秒針會一秒鐘移動一個元素,移動到哪里就釋放哪里的任務
意義:這個模塊是對Connection對象的生命周期管理,一個連接在規定的標準時間內沒發生任何時間,則該連接時非活躍連接,服務器沒必要維護一個非活躍連接,這會浪費系統資源,所以超時后會自動釋放連接。
EventLoop模塊
功能:進行連接管理的整合模塊(對連接的事件監控以及超時任務的管理都是由該模塊提供接口),也就是我們所說的one thread one eventloop中的loop,即從屬Reactor,這個模塊與線程一一對應。
設計:EventLoop中會組合一個定時器模塊、Poller模塊,用來對描述符進行IO事件監控操作、定時任務操作,并且在模塊中還會設置一個任務隊列,該任務隊列保證一個連接的所有操作都應該在同一個線程內部執行,所以對于需要在線程內部執行的函數,我們都封裝為一個可調用對象,push到EventLoop的任務隊列中,這樣就保證了對一個鏈接的所有操作都在同一個線程內部執行費,避免了線程安全而不得為每一個連接加鎖最終導致效率極低的問題。
由于任務隊列中的人物都是在事件就緒后產生的,所以如果epoll中沒有事件就緒,那么從屬Reactor執行流就會阻塞在epoll_wait中,導致任務隊列的任務遲遲得不到執行,為了避免發生這個問題,我們又創建了一個eventfd,將eventfd的讀事件也加入事件監控中,一旦我們向任務隊列中添加了一個任務,那么我們就需要向eventfd中寫入一個數據,此時epoll_wait至少會因為eventfd的讀事件就緒而返回就緒事件,在執行完就緒事件后,就可以執行任務隊列中的任務了。
意義:對于服務器的所有事件都是由EventLoop模塊完成的,每一個新獲取上來的連接都會創建一個Connection對象,每個Connection對象都會綁定一個EventLoop模塊和一個線程,這樣就可以避免線程安全問題,因為一個連接的所有操作都只在一個線程內執行。
Any模塊
功能:用于適配所有上層協議,不同的協議對應的上下文類型不同,我們需要提供接口供上層用戶修改支持的協議
意義:供上層用戶修改支持的協議
Connection模塊
功能:對一個通信連接整體管理的模塊,對一個連接的所有操作都是通過這個模塊進行的,Connection類包含一個EventLoop的指針對象,可以通過該指針對連接進行監控管理、定時任務管理。在Connection實例化對象時,在構造函數中為對每一個連接設置讀、寫、錯誤、關閉、任意事件回調函數。
意義:是對一個已經被獲取上來的連接做管理的模塊,增加鏈接操作的靈活和便捷性。
Acceptor模塊
功能:對監聽套接字做管理的模塊?
意義:當獲取了一個新連接的描述符之后,需要為該通信連接封裝一個Connection對象,為該連接設置各種回調函數(讀、寫、錯誤、任意、多個階段回調函數)。
Acceptor模塊本身并不知道一個連接產生了某個事件后該如何處理,因此獲取一個通信連接后,Connection對象的創建以及各個階段回調函數設置都是在服務器模塊進行的。
模塊關系
項目規劃中展示了各個模塊的基本功能和大致設計,但是整體來說還是思維較亂,所以下面我用幾張圖來把這些模塊之間的關系構建出來。
Connection模塊的邏輯圖
Connection通信模塊圖中包含四個大模塊:
- Buffer緩沖區模塊
- Socket套接字模塊
- Channel描述符事件管理模塊
- TcpServer設置的四個階段回調函數
Connection通過組合Buffer類、Socket類以及Channel類對象,設計出連接的讀回調、寫回調、錯誤回調、關閉回調以及任意事件回調,并在構造函數中對Channel的五個事件回調函數進行bind設置,以達到每一個Connection對象在被實例化之后,都有對應的事件就緒的回調函數。
其中Socket在五個回調函數的實現中發揮了從TCP接收緩沖區讀數據、向TCP發送緩沖區寫數據、關閉套接字這三個作用。
Channel發揮了當需要向TCP發送緩沖區寫入數據時監控連接的寫事件,數據寫完之后取消監控描述符的寫事件,以及當連接關閉時移除連接的事件監控這幾個作用。
Buffer則起到了緩沖數據的作用,因為從TCP接收緩沖區讀數據時讀上來的并不是一個完整的報文,所以此時需要用戶緩存,當發送數據時TCP的發送緩沖區可能已經滿了,此時寫數據就會阻塞,所以也需要緩沖區等待寫事件就緒。
并且Connection類中還有EventLoop指針指向一個EventLoop,所以Channel對象對外提供連接的監控接口實際上就是通過EventLoop組合的Poller類對象實現的,因為一個線程對應一個EventLoop,一個EventLoop對應一個計時器模塊和一個epoll,如果一個連接屬于該EventLoop,則該連接的事件監控就應該被設置在EventLoop的epoll中。所以Connection對外提供的事件監控操作、定時任務操作接口都是通過EventLoop指針間接調用的Poller接口、TimerWheel接口,例如啟動、取消非活躍連接銷毀,刷新活躍度這幾個接口函數。
Acceptor模塊邏輯圖?
?對于Acceptor類來說,我們需要單獨為其設置 一個讀回調函數,因為這是一個監聽連接,并不是一個通信連接,對于通信連接所設置的讀回調是將TCP接收緩沖區數據拷貝到inbuffer中,而listen讀事件就緒應該調用accept從全連接隊列獲取新連接,為新連接創建struct file等內核數據結構分配文件描述符,所以Acceptor類需要組合Channel來管理listen的文件描述符讀事件,需要組合Socket類來調用accept接口。
但是Acceptor模塊本身并不知道監聽連接讀事件就緒后該如何處理,因此獲取一個通信連接后,Connection對象的創建以及各個階段回調函數設置都是在服務器模塊進行的,所以需要TcpServer模塊傳入OnConnected回調函數進行設置后續動作。
EventLoop模塊邏輯圖?
在第一個Connection的邏輯圖中我們已經講到了EventLoop的組合關系:一個線程對應一個EventLoop,一個EventLoop對應一個計時器模塊和一個epoll,各個從屬Reactor互不干擾,避免了因線程安全而不得不加鎖,從而導致整體效率降低問題。
Channel包含了EventLoop指針,Connection也包含了EventLoop指針,并且Connection又組合了Channel對象管理Connection對應連接的文件描述符的事件監控任務,Connection又對Channel設置了五個回調函數,所以Channel也相當于回指了Connection,至此
Channel :Connection :EventLoop = m?: m?: n? (m >> n)
(m指通信連接的數量,n指從屬Reactor的數量)
實現?
TimerWheel -- 時間輪定時器
定時器系統調用
在進行TimerWheel 模塊中,需要用到一個定時器的概念,而在之前的內容中沒有對于這部分內容進行總結,因此這里對于這個定時器的內容進行學習,首先認識一下定時器的相關接口信息:
#include <sys/timerfd.h>
int timerfd_create(int clockid, int flags);
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);// 參數中包含的結構體
struct timespec {time_t tv_sec; /* Seconds */long tv_nsec; /* Nanoseconds */
};struct itimerspec {struct timespec it_interval; /* Interval for periodic timer */struct timespec it_value; /* Initial expiration */
};
timerfd_create函數,這個函數的功能是創建一個定時器
- 對于第一個參數clockid來說,有兩個選項:
- CLOCK_REALTIME:表示的是以系統的時間為基準值,這是不準確的,因為如果系統的時間出現了問題可能會導致一些其他的情況出現
- CLOCK_MONOTONIC:表示的是以系統啟動時間進行遞增的一個基準值,也就是說這個時間是不會隨著系統時間的改變而進行改變的
- 第二個參數是flag,也就是所謂的標記位,這里我們選擇是0表示的是阻塞操作
函數的返回值是一個文件描述符,因為Linux下一切皆文件,所以對于這個函數來說其實就是打開了一個文件,對于這個定時器的操作就是對于這個文件的操作,定時器的原理其實就是在定時器的超時時間之后,系統會給這個描述符對應的文件定時器當中寫入一個8字節的數據,當創建了這個定時器之后,假設定時器中創建的超時時間是3秒,那么就意味著每3秒就算是一次超時,那么從啟動開始,每隔3秒,系統就會給描述符對應的文件當中寫入一個1,表示的是從上一次讀取到現在超時了1次,假設在30s之后才讀取數據,那么會讀上來的數據是10,表示的是從上一次讀取到現在實踐超出限制了10次
timerfd_settime函數,啟動定時器
- 函數的第一個參數是第一個函數的返回值,這個文件描述符其實也是創建的定時器的標識符
- 第二個標記位表示的是使用的是相對時間,默認給0
- 后面的兩個參數也很好理解,表示的是新的時間和舊的時間,不需要就置空即可
下面寫一份實例代碼:
#include <iostream>
#include <unistd.h>
#include <sys/timerfd.h>
using namespace std;int main()
{// 創建一個定時器int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);struct itimerspec itm;// 設置第一次超時的時間itm.it_value.tv_sec = 1;itm.it_value.tv_nsec = 0;// 設置第一次超時后,每隔多長時間超時一次itm.it_interval.tv_sec = 1;itm.it_interval.tv_nsec = 0;timerfd_settime(timerfd, 0, &itm, nullptr);while(true){uint64_t tmp;int ret = read(timerfd, &tmp, sizeof(tmp));if(ret < 0)return -1;cout << "超時的次數:" << tmp << endl;sleep(3);}close(timerfd);return 0;
}
上面的例子,就是一個定時器的使用實例,借助這個使用的例子就可以進行判斷出每隔1秒超時器就會超時一次,然后向文件中寫入一個1,我們這里sleep了3秒,所以后面的讀取數據中都是超時了3次?。
時間輪設計
在上述的例子當中,存在一個比較大的問題,每次超時都要把所有的連接遍歷一次,這樣的效率是比較底下的,所以衍生出了一個新的方案,時間輪:
現在定義一個二維動態數組(vector<vector<Task>>),其中包含一個秒針下標,秒針指向的是數組的起始位置,這個秒針每秒鐘向后走一步,走到哪里就代表哪里的任務需要被執行了,那么如果想要定一個3秒之后的任務,那么只需要將任務添加到當前秒針的后三個元素的位置,讓其每秒鐘走一格,那么走到對應的位置就可以執行對應位置的任務即可。
但是時間輪的類型設計依舊不合適
如果vector<vector<Task>>只存放Task,那么秒針走到哪里就執行哪里的任務,這看起來并沒有什么問題,但是關鍵點在于如何刷新任務?
如果要將舊的超時任務“拿出來”并根據當前秒針位置向后偏移超時時間,然后插入,看似簡單但實際很難操作
- 如何知道舊的超時任務在哪里?我們是根據秒針的位置push_back到秒針所處的一個vector中,而秒針一直在移動(不能刻舟求劍吧)
- 知道了在哪個vector中,那么怎么找到哪個元素是我們的定時任務?大家類型都相同,都是一個沒有參數的可調用對象
即使上面兩個問題都能實現,那也定然需要使用更多的容器、內存來輔助,但是我們有一個更巧妙的方法:存放shared_ptr智能指針 ?vector<vector<std::shared_ptr<TimerTask>>>
存放一個管理超時任務對象資源的shared_ptr智能指針,這是因為如果一個非活躍的連接在即將超時時,突然事件就緒了,那么該連接的活躍度應該刷新,那么為了實現刷新功能,我們使用了shared_ptr智能指針的引用,如果有兩個shared_ptr共同管理一個資源,那么只有兩個shared_ptr都結束生命,最終才會delete管理的資源,并調用管理資源的析構函數。
我們將超時后發生的事件放在了TimerTask的析構函數中,將任務的執行與TimerTask對象的生命周期進行綁定,一旦TimerTask生命周期結束,那么自動調用析構函數,所以在析構的時候就會調用我們的超時事件,所以可以巧妙的與shared_ptr結合,實現任務的刷新功能,只需要將之前的shared_ptr拷貝一個shared_ptr,再添加到一個新的超時時間中,那么第一個shared_ptr銷毀時由于引用計數不為0,并不會釋放資源。
關鍵點:為了能在之后刷新時,找到上一個超時時間的shared_ptr智能指針,我們需要一個容器來存儲事件對應的shared_ptr,這里又有一個關鍵,我們不能在容器中使用shared_ptr做管理,因為這會增加引用計數!所以需要使用weak_ptr,weak_ptr可以使用資源但是不會增加引用計數
最后的clear也是一個關鍵點,clear會刪除容器內所有元素,刪除的原理是vector的賦值運算符重載函數在賦值之前會delete釋放自身的資源,這就會調用數據shared_ptr的析構函數(內部再判斷--引用計數是否為0),從而deleteTimerTask,調用TimerTask的析構函數,進而執行我們期待的超時執行任務。
實現
TimerTask:
- 需要對外提供一個接口,用來設置析構時應該回調的函數。
- 因為TimerWheel中timers哈希表表示一個任務是否還在等待超時,如果TimerTask調用了析構函數,就表明沒有shared_ptr管理TImerTask了,所以也需要將timers中記錄的信息刪除,我們將刪除這一操作也放到TImerTask的析構函數中處理。
TimerWheel:
- capacity來指明時間輪的大小,即最大可以設置的超時時間,我們默認為60,因為一個連接一般最多非活躍60s,我們就需要釋放連接了,否則干耗著資源不使用,降低服務器效率。
- 一個EventLoop包含一個計時器模塊,所以EventLoop對外提供的計時器接口實際上是間接調用的計時器模塊的接口
- 為了保證1s鐘我們的秒針運動一次,我們使用內核的timerfd來控制,監控timerfd的讀事件
- 為了管理timerfd我們需要一個channel對象
- 在構造函數中,直接bind設置timerfd的讀回調為類內的OnTime成員函數,并監控它的讀事件
- 我們在TcpServer中會使用id唯一標識一個連接,所以我們在定時器使用時只需要傳入該連接的id、延遲時間以及超時任務即可
- 對外提供對conn_id的添加超時任務、刷新超時任務、刪除超時任務、判斷超時任務是否存在四個接口
如何實現1s鐘我們的指針后移一位,也就是執行一次Run函數?
顯然不能寫一個死循環,Run之后sleep 1s。這一方面會阻塞執行流,另一方面并不能保證其他操作以及Run耗時為0。
這里就需要使用到我們上面講解的 tiemrfd,我們將create一個timerfd,設置內核每1s向timerfd寫入一個8字節大小的1,并通過channel對象回指的EventLoop,間接調用EventLoop中的Poller接口,將tiemrfd的讀事件放到epoll中進行監控,那么一旦timerfd讀事件就緒了,就表明距離上一次已經過去了1s或多s,會過去多s這是因為一個EventLoop的epoll會監控大量事件(一般一個從屬Reactor能支持1w - 10w的并發量),所以timerfd的就緒事件可能在很靠后的位置,那么從屬Reactor執行前面的就緒事件并進行業務處理可能會耗時多秒,但是沒有關系,因為內核每1s都會向timerfd寫入一個8字節的1,所以只要在timerfd的讀回調中根據讀到的timer次數,來決定秒針移動幾步即可。
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
// 每一個超時任務
class TimerTask
{
private:uint64_t _id; // 標識任務iduint32_t _timeout; // 任務需要設置的延遲事件TaskFunc _task_cb; // 當超時后需要做的任務,void()類型函數,所以參數由用戶自己bind傳入ReleaseFunc _release; // 當該任務被執行時,需要將TimerWheel中的_timers哈希表中刪除我這個任務,表明我任務完成了,不存在了bool _iscancel; // 是否取消了該任務,false 表示任務沒有被取消,true表示任務被取消了
public:TimerTask(uint64_t id, uint32_t timeout, const TaskFunc &cb) : _id(id), _timeout(timeout), _task_cb(cb), _iscancel(false) {}~TimerTask() { if (_iscancel == false)_task_cb();_release();}// 對外提供設置release回調函數的接口void SetRelease(const ReleaseFunc& cb) { _release = cb;}void Cancel() { _iscancel = true; }uint32_t GetTimeout() { return _timeout; }
};// shared_ptr就是設計精髓,可以讓計時的事件刷新
class TimerWheel
{
private:int _tick; // 秒針int _capacity; // 控制時間輪的長度,秒單位std::vector<std::vector<std::shared_ptr<TimerTask>>> _wheel; // 時間輪std::unordered_map<uint64_t, std::weak_ptr<TimerTask>> _timers; // 用來尋找之前的shared_ptr,但是需要用weak_ptr接收,不能讓_timers參與引用計數int _timerfd; // 定時器描述符EventLoop* _loop; // 回指到EventLoop,使用EventLoop的接口設置監聽事件std::unique_ptr<Channel> _timer_channel; // 管理timerfd的連接事件
private:void TimerErase(uint64_t id){auto it = _timers.find(id);if (it == _timers.end())return;_timers.erase(it);}static int CreateTimerfd(){// 創建int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);if (timerfd < 0){ERR_LOG("timerfd create error!");abort();}struct itimerspec itime;// 設置超時時間itime.it_value.tv_sec = 1;itime.it_value.tv_nsec = 0;// 設置超時后的下一次超時時間itime.it_interval.tv_sec = 1;itime.it_interval.tv_nsec = 0;// 設置進內核timerfd_settime(timerfd, 0, &itime, nullptr);return timerfd;}// 當timerfd讀事件就緒了,就把數據堵上來,不然的話epoll會一直就緒int ReadTimerfd(){uint64_t times;int ret = read(_timerfd, ×, sizeof(times));if (ret < 0){ERR_LOG("timerfd read error!");abort();}return times;}// 將秒針每秒加一,如果有任務就執行,實際動作就是clearvoid Run(){_wheel[_tick].clear(); // 釋放空間意味著shared_ptr被調用析構函數,啟動對應任務_tick = (_tick + 1) % _capacity; // 秒針往后走}// 1s時間到嘍!內核會向timerfd寫入一個1,所以此時epoll事件就緒,進行秒針 + 1void OnTime(){// 每次時間到了,把數據讀一下,如果不讀的話,epoll會一直就緒int times = ReadTimerfd();// 然后再執行超時任務,秒針后移
/* 因為有可能一個連接的業務處理時間很長,例如30s,那么在這30s內秒針應該走30次,但是由于從屬線程一直在執行業務處理,而沒空去讀取timerfd,所以當真正秒針在移動并執行超時任務時,需要知道內核通知了幾次,通知了幾次就表明過去了幾秒,也就意味著Run幾次
*/for (int i = 0; i < times; i++){Run();}}// 向時間輪中添加超時事件void AddTimerTaskInLoop(uint64_t id, uint32_t delay, const TaskFunc& cb){std::shared_ptr<TimerTask> pt(new TimerTask(id, delay, cb));pt->SetRelease(std::bind(&TimerWheel::TimerErase, this, id));int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);_timers[id] = std::weak_ptr<TimerTask>(pt);} // 刷新超時時間void ReflushTimerTaskInLoop(uint64_t id){// 先判斷之前在_timers中有沒有記錄auto it = _timers.find(id);if (it == _timers.end())return;// 有記錄則獲取上一次計時任務的shared_ptrstd::shared_ptr<TimerTask> pt = it->second.lock(); // 獲取哈希表中存放的weak_ptr,使用lock函數獲取weak_ptr共享的shared_ptr對象// 更新超時時間,放到新的時間輪內int pos = (_tick + pt->GetTimeout()) % _capacity;_wheel[pos].push_back(pt);}/* 取消一個TimerTask注意:不能直接從時間輪中刪除,因為刪除操作就意味著shared_ptr自動調用析構函數,就扭曲了我們的意愿,變為了提前執行超時時間,而不是取消一個超時時間所以,我們要在TimerTask中添加一個字段iscancel,表明該事件是否被取消了,如果被取消了,就不要在析構函數中執行方法*/void CancelTimerTaskInLoop(uint64_t id){auto it = _timers.find(id);if (it == _timers.end()) return;std::shared_ptr<TimerTask> pt = it->second.lock();if (pt) pt->Cancel();}
public:TimerWheel(EventLoop* loop) : _tick(0), _capacity(60), _wheel(_capacity) , _loop(loop), _timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd)) {// 將tiemrfd加入事件監聽,因為內核每1秒鐘使timerfd超時一次,即事件就緒一次,所以就調用我們綁定的回調函數OnTimer,執行所有超時任務并將秒針+1_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));_timer_channel->EnableRead();}/*如果多個線程同時操作成員timers哈希表,那么一定會引發線程安全問題,我們當然可以加鎖,但是加鎖會導致效率降低,我們此時采用空間換時間的方式,也將定時器的所有操作任務都放到同一個EventLoop的任務隊列中,在一個線程中串行執行任務不會引發線程安全問題*//*因為下面三個函數用到了_loop的成員函數,所以需要在EventLoop類的下面進行類外定義*/void AddTimerTask(uint64_t id, uint32_t delay, const TaskFunc& cb);void ReflushTimerTask(uint64_t id);void CancelTimerTask(uint64_t id);// 該定時任務是否存在bool HasTimer(uint64_t id){auto it = _timers.find(id);if (it == _timers.end())return false;return true;}
};
通用類型Any?
對于Connection來說,它的工作任務之一是要對于連接進行管理,那么就意味著這個模塊是會涉及到對于應用層協議的處理的,因此在Connection中要設置協議處理的上下文來控制處理節奏
應用層的協議是有很多的,平時使用最多的是http協議,不過也會有例如ftp協議這樣的存在,而為了使得本項目可以支持的協議足夠多,那么就意味著不能固定寫死類型,而是可以存儲任意協議的上下文信息,因此就需要設計一個通用類型來保存各種不同的數據
我們想要做成的效果是,這個Any類,可以接受各種類型的數據,例如有這樣的用法:
Any a;
a = 10;
a = "abc";
a = 12.34;
...
那該如何設計這個通用類型Any?
這里參考了一種嵌套類型,在一個類中嵌套存在一個新的類,在這個類中存在模板,而進而對于類進行處理??
class Any
{
private:class holder{// ...};template <class T>class placeholder : public holder{T _val;};holder *_content;
};
在這個Any類中,成員保存的是holder類的指針,當Any類容器需要保存一個數據的時候,只需要通過placeholder子類實例化一個特定類型的子類對象出來,讓這個子類對象保存數據即可,具體原理為多態:
Any保存的是一個父類指針,那么子類重寫父類的虛函數,通過父類指針調用虛函數時就會實現動態的多態,這就是Any的原理。?
實現?
?注意:
- 拷貝構造、賦值運算符重載的現代寫法
- 拷貝時需要借助placeholder提供的clone方法,因為拷貝構造的其實是placeholder
- 對外提供get方法,獲取placeholder的對象,即協議上下文
class Any
{
private:class holder {public:virtual ~holder() {}virtual const std::type_info& type() = 0;virtual holder *clone() = 0;};template<class T>class placeholder: public holder {public:placeholder(const T &val): _val(val) {}// 獲取子類對象保存的數據類型virtual const std::type_info& type() { return typeid(T); }// 針對當前的對象自身,克隆出一個新的子類對象virtual holder *clone() { return new placeholder(_val); }public:T _val;};holder *_content;
public:Any():_content(NULL) {}template<class T>Any(const T &val):_content(new placeholder<T>(val)) {}Any(const Any &other):_content(other._content ? other._content->clone() : nullptr) {}~Any() { delete _content; }Any &swap(Any &other) {std::swap(_content, other._content);return *this;}// 返回子類對象保存的數據的指針template<class T>T *get() {//想要獲取的數據類型,必須和保存的數據類型一致assert(typeid(T) == _content->type());return &((placeholder<T>*)_content)->_val;}//賦值運算符的重載函數template<class T>Any& operator=(const T &val) {//為val構造一個臨時的通用容器,然后與當前容器自身進行指針交換,臨時對象釋放的時候,原先保存的數據也就被釋放Any(val).swap(*this);return *this;}Any& operator=(const Any &other) {Any(other).swap(*this);return *this;}
};
測試:?
Buffer
作為緩沖區模塊,Buffer是實現通信用戶的接收緩沖區和發送緩沖區域的功能。我們采用的是vector容器來實現Buffer,并沒有采用stirng,因為我們會頻繁的在Buffer中存放數據,取出數據,如果使用string的substr,那么就會產生大量的數組移動,這是降低效率的,雖然vector添加或刪除一部分數據也會發生大量的數組元素移動構,但是我們會使用讀、寫下標維護一個可讀空間,取出數據是只需要移動讀下標即可,string更偏向于字符串的操作,而對于緩沖區來說用數組來實現更為合適。
Buffer負責兩個功能:
- 寫入數據,從寫下標開始,將需要寫入輸入緩沖區的數據拷貝到寫下標開始的空間中
- 讀取數據,從讀下標開始,將用戶需要的指定長度的數據返回,前提是讀下標不能超過寫下標
成員變量:
- vector
- 讀下標
- 寫下標
對外接口:
因為我們對已經無效的數據并不采取刪除操作,當有新數據到來后直接覆蓋寫入即可,所以我們維護的兩個下標會在vector中劃分出三個區域:
[0, read_pos)? [read_pos, write_pos)? [write_pos, vector.size() - 1]
從緩沖區讀取數據時從讀下標開始讀,向緩沖區寫數據時從寫下標開始。
向緩沖區寫數據時,尾部的空閑空間可能不足,此時需要判斷尾部空閑空間加上頭部的空閑空間是否滿足寫入的數據大小需求,如果滿足,則將讀下標到寫下標內的數據移動至0下標,然后寫入數據。
如果頭部空間空間+尾部空閑空間不能滿足寫入數據的大小,則不移動數據,字節resize擴容vector,這樣就避免了頻繁移動數組元素的效率問題。
- 獲取當前寫位置地址以及const版本
- 獲取當前讀位置地址以及const版本
- 尾部空閑空間大小
- 頭部空閑空間大小
- 獲取??可寫數據大小
- 獲取可讀數據大小
- 讀下標后移
- 寫下標后移
- 擴容函數
- 從緩沖區讀數據
- 向緩沖區寫數據
- 獲取一整行數據。方便后期處理協議
- 清理功能
實現
class Buffer
{static const int default_size = 1024;private:std::vector<char> _buffer; // 緩沖區uint64_t _write_pos; // 寫下標uint64_t _read_pos; // 讀下標public:Buffer() : _buffer(default_size), _write_pos(0), _read_pos(0) {}void Clear() { _write_pos = _read_pos = 0; }// 返回寫下標空間地址char *WritePostion() { return &*(_buffer.begin() + _write_pos); }// 返回讀下標空間地址char *ReadPosition() { return &*(_buffer.begin() + _read_pos); }// const版本返回寫下標空間地址const char *WritePosition() const { return &*(_buffer.begin() + _write_pos); }// const版本返回讀下標空間地址const char *ReadPosition() const { return &*(_buffer.begin() + _read_pos); }// 尾部空閑大小uint64_t TailSize() const { return _buffer.size() - _write_pos; }// 頭部空閑空間uint64_t HeadSize() const { return _read_pos; }// 獲取可寫數據大小uint64_t WriteAbleSize() const { return TailSize() + HeadSize(); }// 獲取可讀數據大小uint64_t ReadAbleSize() const { return _write_pos - _read_pos; }// 讀下標后移void MoveReadPos(uint64_t len){// 這里只進行讀下標移動,至于空間是否足夠讓Read函數自行判斷assert(len <= ReadAbleSize());_read_pos += len;}// 寫下標后移void MoveWritePos(uint64_t len){// 這里只進行寫下標移動,至于空間是否足夠讓Read函數自行判斷,程序到了這里空間必然足夠assert(len <= TailSize());_write_pos += len;}// 擴容函數void Reserve(uint64_t len){if (len <= TailSize())return;else if (len <= WriteAbleSize()){uint64_t size = ReadAbleSize();// 將所有數據向前移動std::copy(_buffer.begin() + _read_pos, _buffer.begin() + _write_pos, _buffer.begin());_read_pos = 0;_write_pos = size;}else_buffer.resize(_write_pos + len);}// 從緩沖區讀數據void Read(void *buf, uint64_t len){// 判斷是否有足夠數據可讀assert(len <= ReadAbleSize());std::copy(ReadPosition(), ReadPosition() + len, static_cast<char*>(buf));// 更新讀下標MoveReadPos(len);}// 重載Readstd::string Read(uint64_t len){assert(len <= ReadAbleSize());std::string str(len, '\0');// 注意:不能直接用c_str(),因為這是const char*,不允許通過指針修改內容Read(&str[0], len);// std::cout << "++++++++" << str << std::endl;return str;}// 獲取一整行數據std::string Getline(){std::string tmp;uint64_t i = _read_pos;for (; i < _write_pos; ++i){if (_buffer[i] != '\n') tmp += _buffer[i];else break;}// 找不到\n,表明沒有完整的一行數據,則返回空if (i == _write_pos) return "";// 否則返回一整行數據,包括\ntmp += _buffer[i++];MoveReadPos(i - _read_pos);return tmp;}// 向緩沖區寫數據void Write(const void *data, uint64_t len){if (len == 0) return;if (!data) return;Reserve(len);std::copy(static_cast<const char*>(data), static_cast<const char*>(data) + len, WritePostion());MoveWritePos(len);}// 重載string類型的write函數void Write(const std::string &str){Write(str.c_str(), str.size());}// 重載本類類型的write函數void Write(const Buffer &buffer){Write(buffer.ReadPosition(), buffer.ReadAbleSize());}
};
Socket
對socket系統調用的封裝模塊,沒有什么需要注意的,我們已經在網絡部分聯系很多次了
?對外接口:
- Create創建套接字
- Bind綁定套接字
- Listen監聽套接字
- Connect客戶端發起連接
- Accept服務端從全連接隊列獲取新連接
- Recv從TCP的接收緩沖區讀取數據
- Send向TCP的發送緩沖區寫入數據
- CreateServer創建一個服務端連接
- CreateClient創建一個客戶端連接
- NonBlock設置套接字非阻塞
- ReuseAddr設置端口復用,避免因服務器崩潰,導致服務器是先揮手的一方,從而進入TimeWait狀態,此時會保留TCP連接的四元組,維持兩個MSL時間,如果立即重啟服務器進程會導致端口綁定失敗,所以我們設置端口復用,可以使服務器進程退出后依然能立即重啟
實現
class Socket
{// 全連接隊列最大值static const int backlog = 1024;
private:int _sockfd; // 客戶端:連接套接字。 服務端:監聽套接字
public:Socket() : _sockfd(-1) {}Socket(int sockfd) : _sockfd(sockfd) {}~Socket() { Close(); }void Close(){if (_sockfd != -1){close(_sockfd);_sockfd = -1;}}int Fd() { return _sockfd; }// 創建套接字bool Create(){_sockfd = socket(AF_INET, SOCK_STREAM, 0);if (_sockfd < 0){ERR_LOG("Create socket Error!");return false;}return true;}// 服務端bind套接字,客戶端不需要手動bind,避免app之間端口沖突,以及防止app非法綁定多個端口號bool Bind(const std::string& ip, uint16_t port){struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t addrlen = sizeof(local);if (bind(_sockfd, reinterpret_cast<const struct sockaddr*>(&local), addrlen) < 0){ERR_LOG("Bind socket Error!");return false;}return true;}// 服務器開啟套接字監聽狀態,監聽是否有連接請求,成功則放入全連接隊列bool Listen(){if (listen(_sockfd, backlog) < 0){ERR_LOG("Listen socket Error!");return false;}return true;}// 服務端從全連接隊列取建立成功的連接int Accept(){int newfd = accept(_sockfd, nullptr, nullptr);if (newfd < 0){ERR_LOG("Accept Error!");return -1;}return newfd;}// 客戶端向 ip:port 發起TCP連接請求,內核自動完成三次握手bool Connect(const std::string ip, uint16_t port){struct sockaddr_in addr;memset(&addr, 0, sizeof(addr));addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(addr);int n = connect(_sockfd, reinterpret_cast<const struct sockaddr*>(&addr), len);if (n < 0){ERR_LOG("Connect Error!");return false;}return true;}// 接收數據。同樣要指明fdssize_t Recv(void* buf, size_t len, int flag = 0){ssize_t n = recv(_sockfd, buf, len, flag);if (n <= 0){// n == 0表示寫端關閉,n < 0則錯誤碼被設置if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)return 0;ERR_LOG("socket recv error!");return -1;}return n;}ssize_t NonBlockRecv(void* buf, size_t len){return Recv(buf, len, MSG_DONTWAIT);}// 發送數據。這里有問題:如果是服務端,要指明套接字fd!ssize_t Send(const void* buf, size_t len, int flag = 0){ssize_t n = send(_sockfd, buf, len, flag);if (n <= 0){if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)return 0;ERR_LOG("socket send error!");return -1;}return n;}ssize_t NonBlockSend(const void* buf, size_t len){if (len == 0) return 0;return Send(buf, len, MSG_DONTWAIT);}// 創建一個服務端連接bool CreateServer(uint16_t port, const std::string& ip = "0.0.0.0", bool block_flag = false){// 1. 創建套接字 2. 設置地址、端口復用 3. 綁定套接字 4. 監聽套接字if (!Create() || !ReuseAddr() || !Bind(ip, port) || !Listen())return false;// 是否設置套接字非阻塞,即Reactor的ET或LT模式if (block_flag) NonBlock();return true;}// 創建一個客戶端連接bool CreateClient(uint16_t port, const std::string& ip, bool block_flag = false){if (!Create() || !Connect(ip, port))return false;if (block_flag) NonBlock();return true;}// 設置套接字為非阻塞bool NonBlock(){int flag = fcntl(_sockfd, F_GETFL);if (flag < 0){ERR_LOG("SetNonBlock Error!");return false;}fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);return true;}// 設置端口復用,避免因服務端主動斷開連接進入TIME_WAIT狀態而在短時間內無法啟動服務bool ReuseAddr(){int opt = 1;if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)) < 0){ERR_LOG("Setsockopt Error!");return false;}return true;}
};
?
測試
Channel
我們在項目規劃中已經講解,Channel是用來負責對一個描述符IO事件管理的模塊,實現對描述符可讀,可寫,錯誤...事件的管理操作,以及Poller模塊對描述符進?IO事件監控就緒后,根據不同的事件,回調不同的處理函數功能。
當epoll中事件就緒了,就執行Channel中提供的事件處理函數,根據相應的就緒事件來調用對應的回調函數。
對外接口:
- 監控讀事件
- 監控寫事件
- 關閉讀事件監控
- 關閉寫事件監控
- 在epoll中取消fd的事件監控
- 設置五個回調函數
- HandlerEvent事件就緒處理函數
實現
細節:一個連接只要觸發了就緒事件,那么我們就應該調用一次任意事件回調函數,在任意事件回調函數中,我們會為該連接刷新活躍度,并調用上層設置的階段回調函數(因為可能上層也需要這個業務場景)
那么如果 fd 對應的連接出現了錯誤,例如對端連接異常關閉,我們recv、send出錯了,這時在Channel的HandlerEvent中,我們應該調用該連接的 close 回調函數,關閉這個異常連接,但是又因為在任意事件觸發之后,我們都要調用任意事件回調函數,所以先關閉連接再調用任意事件回調會發生段錯誤,因為連接已經關閉,Connection對象已經銷毀,而我們是在Connection中為Channel綁定的函數,連接關閉意味著程序會在HandlerEvent函數中解引用空指針并調用回調函數。
為了解決這個問題,我們當然可以先執行任意事件回調,再去執行關閉連接回調,但是由于后面也引出了一個問題(Connection的Release函數引發的問題),所以我們實際的close并不會直接關閉連接,而是會等待就緒事件處理完畢再釋放連接,所以我們可以統一的將任意事件回調放在最后
class Poller;
class EventLoop;
class Channel
{using EventCallBack = std::function<void()>;
private:EventLoop* _loop; // 回指Poller類,從而使用EventLoop中封裝的epoll接口進行事件監控int _fd; // 關心事件的文件描述符fduint32_t _events; // 關心的事件uint32_t _revents; // 就緒的事件EventCallBack _read_cb; // 可讀事件觸發后執行的回調函數 EPOLLINEventCallBack _write_cb; // 可寫事件觸發后執行的回調函數 EPOLLOUTEventCallBack _error_cb; // 錯誤事件觸發后執行的回調函數 EPOLLERREventCallBack _close_cb; // 連接斷開事件觸發后執行的回調函數 EPOLLHUPEventCallBack _event_cb; // 任意事件觸發后執行的回調函數
public:Channel(EventLoop* loop, int fd) : _fd(fd), _events(0), _revents(0), _loop(loop) {}int Fd() { return _fd; }// 外部獲取關心的事件uint32_t GetEvents() { return _events; }// 外部設置就緒事件void SetRevents(uint32_t revents) { _revents = revents; }// 設置回調函數void SetReadCallback(const EventCallBack& cb) { _read_cb = cb; }void SetWriteCallback(const EventCallBack& cb) { _write_cb = cb; }void SetErrorCallback(const EventCallBack& cb) { _error_cb = cb; }void SetCloseCallback(const EventCallBack& cb) { _close_cb = cb; }void SetEventCallback(const EventCallBack& cb) { _event_cb = cb; }// 是否監控了fd的讀、寫事件bool ReadAble() { return _events & EPOLLIN; }bool WriteAble() { return _events & EPOLLOUT; }// 監控讀事件void EnableRead() { _events |= EPOLLIN; Update(); }// 監控寫事件void EnableWrite() { _events |= EPOLLOUT; Update(); }// 關閉讀事件監控void DisableRead() { _events &= ~EPOLLIN; Update(); }// 關閉寫事件監控void DisableWrite() { _events &= ~EPOLLOUT; Update(); }// 關閉監控fd的全部事件void DisableAll() { _events = 0; Update(); }// 在 epoll 中添加、修改、移除 _fd 對應的事件監控// 由于使用到了EventLoop類中封裝的Poller類中的函數,而EventLoop類在下面才定義,所以需要將下面兩個函數的定義放在EventLoop類下面void Remove();void Update();// 事件分配器,一旦觸發了事件,就執行HandlerEvent,具體調用哪個函數由HandlerEvent決策void HandlerEvent(){// EPOLLIN -- 讀事件就緒 EPOLLHUP -- 該連接關閉 EPOLLPRI -- 攜帶帶外數據的高優先級事件就緒,這三者都需要進行讀回調if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)){if (_read_cb) _read_cb();}// 有可能釋放連接的操作,一次只能執行一次,避免錯誤發生if (_revents & EPOLLOUT){if (_write_cb) _write_cb();}else if (_revents & EPOLLERR) // EPOLLERR 表示錯誤{// 異常的情況連接、套接字都會關閉,所以任意事件回調需要在異常處理之前調用if (_error_cb) _error_cb();}else if (_revents & EPOLLHUP) // EPOLLHUP 表示連接還未建立{if (_close_cb) _close_cb();}// 任意事件回調if (_event_cb) _event_cb();}
};
Poller
同Socket模塊一樣,是對epoll三個系統調用的一個封裝,便于我們后續程序中的調用。
成員:
- _channels,一個維護epoll所監控的所有事件的哈希表,管理文件描述符與對應事件管理的的Channel對象
- epfd,保存創建epoll返回到文件描述符,即epoll的操作句柄
- evs數組,用來保存已經就緒的事件的epoll_event結構體
對外接口:
- 添加或更新對描述符的事件監控
- 移除對描述符的事件監控
?邏輯流程:
- 對描述符進行監控,但是需要監控描述符的什么事件,由Channel來提供
- 當描述符的事件就緒之后,應該調用描述符的哪個事件回調,也由Channel來負責
class Poller
{static const int num = 1024;
private:int _epfd; // epoll的fdstruct epoll_event _evs[num]; // 用于接收就緒事件的數組std::unordered_map<int, Channel*> _channels; // fd與對應Channel映射
private:// 對epoll的直接操作void Update(Channel* channel, int op){int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->GetEvents();int ret = epoll_ctl(_epfd, op, fd, &ev);if (ret < 0)ERR_LOG("epoll_ctl error!");}// 判斷一個Channel是否已經添加到了事件監控bool HasChannel(Channel* channel){auto it = _channels.find(channel->Fd());if (it == _channels.end()) return false;return true;}
public:Poller() : _epfd(-1) {_epfd = epoll_create(num);if (_epfd < 0){ERR_LOG("epoll_create error!");abort();}}// 添加或修改事件void UpdateEvent(Channel* channel){auto it = _channels.find(channel->Fd());if (it == _channels.end()){// 沒有找到就是添加事件int fd = channel->Fd();_channels[fd] = channel;Update(channel, EPOLL_CTL_ADD);}else Update(channel, EPOLL_CTL_MOD); // 否則添加事件}// 移除監控void RemoveEvent(Channel* channel){auto it = _channels.find(channel->Fd());if (it != _channels.end()){epoll_ctl(_epfd, EPOLL_CTL_DEL, channel->Fd(), nullptr);_channels.erase(it);}}// 開始監控,返回活躍連接void Poll(std::vector<Channel*>* active){int n = epoll_wait(_epfd, _evs, num, -1);if (n < 0){if (errno == EINTR)return;ERR_LOG("epoll wait error!, error: %s", strerror(errno));abort();}// 將就緒的channel*返回for (int i = 0; i < n; i++){auto it = _channels.find(_evs[i].data.fd);assert(it != _channels.end());it->second->SetRevents(_evs[i].events);active->push_back(it->second);}}
};
EventLoop(核心)
EventLoop是我們服務器模塊的統籌管理模塊,因為我們所說的 one loop per thread 或 one thread one EventLoop,中的loop指的就是EventLoop,主Reactor、從屬Reactor都需要獨立的一個EventLoop。
eventfd?
在對于EventLoop模塊的學習前,要先看一下eventfd這個函數,它的核心功能就是一種事件的通知機制,簡單來說,當調用這個函數的時候,就會在內核當中管理一個計數器,每當向eventfd當中寫入一個數值,表示的就是事件通知的次數,之后可以使用read來對于數據進行讀取,讀取到的數據就是通知的次數
假設每次給eventfd寫一個1,那么就表示通知了一次,通知三次之后再進行讀取,此時讀取出來的就是3,讀取了之后這個計數器就會變成0
eventfd的應用場景在本項目中,是用于EventLoop模塊中實現線程之間的事件通知功能的
返回值?
一個文件描述符?
參數?
initval
- 計數初值?
flags
- EFD_CLOEXEC,禁止進程復制
- EFD_NONBLOCK,啟動非阻塞屬性
讀取操作也都是使用read、write,因為一切皆文件,但是注意在讀寫的時候要讀寫8字節大小?
設計思路
EventLoop組合了一個Poller和一個計時器,并且EventLoop與線程一一對應。
為什么 一個線程要有一個對應的EventLoop?
如果我們不將EventLoop與一個線程一一對應,那么一個連接放在EventLoop的Poller中監控,一旦事件就緒,那么外接的線程池可能會爭搶著處理該就緒事件,那么這就引發了線程安全問題,我們不得不進行加鎖,而加鎖就意味著服務器整體效率的降低,每一個連接都加一個鎖這是不現實的,一臺百萬級并發量的服務器,如果為每一個連接都加鎖,這個效率和內存消耗是很龐大的。
因此哦我們需要將一個連接的事件監控、連接的事件處理以及其他操作都放在同一個線程中進行,這樣就避免了因加鎖導致的效率問題。
如何保證一個連接的所有操作都在EventLoop對應的線程中??
為EventLoop添加一個任務隊列,對連接的所有操作都進行一次封裝,即對連接的操作并不直接執行,而是當做任務添加到任務隊列中。
既然一個線程對應一個Connection,那么該線程不就是會串行的執行epoll返回的就緒事件嗎?我這個連接也沒有放在其他線程的EventLoop中監控啊,為什么會引發害怕多線程爭搶搞一個任務隊列,保證一個連接的所有事件都是在同一個線程內執行?
在muduo網絡庫中,EventLoop引入任務隊列的主要目的是為了支持更復雜的并發場景以及解耦業務邏輯與事件循環本身。雖然在一個簡單的模型下,每個線程確實只處理屬于它的那些連接(即一個線程對應若干個Connection對象),但在實際應用中有以下幾個原因促使需要實現任務隊列:
- 跨線程操作:盡管當前連接綁定到特定的 EventLoop 和其所在的線程,但是可能存在從其他線程提交的任務希望影響該連接的行為。例如,某些控制指令可能來自另一個管理線程而不是直接由 IO 事件觸發。
- 延后執行:有時候我們需要將一些非緊急的操作推遲到主事件循環中去完成,這可以避免阻塞關鍵路徑上的工作。通過把任務加入隊列交給 eventloop 來統一調度,就可以做到這一點。
- 保證順序一致性:如果同一連接產生了多種不同類型的任務(如讀取完成后寫回響應),利用任務隊列可以幫助保持這些任務按照正確的順序被執行而無需擔心亂序問題。
因此即便是在單一線程環境下運行某條連接的所有回調函數,使用任務隊列仍然能提供靈活性和更好的結構化設計思路。所以可以說 RunInLoop
是圍繞著多線程協作及內部任務調度這一目標展開的重要功能之一。
EventLoop處理流程
- 在線程中對描述符進行事件監控
- 事件就緒則進行事件處理?
- 所有就緒事件處理完畢后再處理任務隊列中的事件
注意:因為處理任務隊列中的任務是在就緒事件之后的,也就會引發一個問題,如果epoll中沒有就緒事件而阻塞在epoll_wait,那么任務隊列中的任務就得不到執行,所以我們需要一個通知機制,來喚醒事件監控的阻塞,即eventfd。
我們還可以擴展從屬Reactor,對從屬Reactor外接線程池,將任務隊列中的任務交給線程池來執行,所以為了日后方便接入線程池,我們對task進行加鎖保護,當需要訪問任務隊列時,直接swap一個新的任務隊列,這樣就避免了每次取任務都要加鎖的時間消耗。
實現
注意:
- 在EventLoop與定時器整合時,需要注意定時器中需要使用EventLoop中的RunInLoop函數,所以即使定時器類實現在EventLoop類之前,我們也必須將涉及到使用EventLoop中的RunInLoop函數的函數放在EventLoop定之后定義。?
- 構造函數內添加對eventfd的讀事件監控,一旦我們向任務隊列push任務后,就向eventfd寫入一個數據,這樣就避免了epoll阻塞問題
- 在加鎖時,我們可以使用unique_ptr智能指針管理鎖,并配合{}限制智能指針的生命周期,從而達到出了作用域鎖自動釋放的操作
- 對外提供Start函數,啟動從屬Reactor的工作,函數內無限循環三個流程:
- 在線程中對描述符進行事件監控
- 事件就緒則進行事件處理?
- 所有就緒事件處理完畢后再處理任務隊列中的事件
class EventLoop
{using func = std::function<void()>;
private:std::thread::id _thread_id; // 標記當先線程的id,以便于后續區分一個連接的操作是否在同一個EventLoop中int _eventfd; // eventfd,當epoll阻塞時,向eventfd寫入數據,此時epoll因事件就緒而返回std::unique_ptr<Channel> _event_channel; // 管理eventfd連接的事件監控Poller _poller; // 管理所有fd的事件監控,一個EventLoop對應一個線程、對應一個epollstd::vector<func> _tasks; // 任務隊列,讓所有的對同一個連接的操作放在同一個線程內部std::mutex _mutex; // 對任務隊列加鎖TimerWheel _timer_wheel; // 定時器模塊public:// 執行_tasks中所有任務void RunTasks(){std::vector<func> t;{std::unique_lock<std::mutex> _lock(_mutex);_tasks.swap(t);}for (auto& f : t){f();}}static int CreateEventfd(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd < 0){ERR_LOG("create eventfd error!");abort();}return efd;}// eventfd的可讀事件回調函數void ReadEventfd(){uint64_t val = 0;int n = read(_eventfd, &val, sizeof(val));if (n < 0){if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)return;ERR_LOG("read eventfd error!");abort();}}// 向eventfd文件寫入數據,使得eventfd的讀事件就緒,從而使epoll脫離阻塞void WakeUpEventfd(){uint64_t val = 1;int n = write(_eventfd, &val, sizeof(val));if (n < 0){if (errno == EINTR)return;ERR_LOG("write eventfd error!");abort();}}
public:EventLoop():_eventfd(CreateEventfd()),_event_channel(new Channel(this, _eventfd)),_thread_id(std::this_thread::get_id()), _timer_wheel(this){// 設置eventfd的可讀事件回調函數,并開始監聽eventfd讀事件_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));_event_channel->EnableRead();}void Start(){while (true){// 1. 事件監控std::vector<Channel*> actives;_poller.Poll(&actives);// 2. 事件處理for (auto& channel : actives){channel->HandlerEvent();}// 3. 執行任務RunTasks();}}// 判斷當前線程是否是EventLoop對應的線程bool IsInLoop() { return _thread_id == std::this_thread::get_id(); }void AssertInLoop(){assert(_thread_id == std::this_thread::get_id());}// 判斷是否執行的任務是處于當前線程中,如果是則執行,不是則放入任務隊列void RunInLoop(const func& cb){if (IsInLoop()) cb();else QueueInLoop(cb);}// 將對連接的操作放入任務隊列void QueueInLoop(const func& cb){// 加鎖,然后將回調函數放入任務隊列{std::unique_lock<std::mutex> _lock(_mutex);_tasks.push_back(cb);}// 喚醒此時可能阻塞的epoll,其實就是向eventfd寫入數據,從而使得eventfd的讀事件就緒WakeUpEventfd();}// 添加、修改描述符的事件監控void UpdateEvent(Channel* channel){_poller.UpdateEvent(channel);}// 移除描述符的事件監控void RemoveEvent(Channel* channel){_poller.RemoveEvent(channel);}// 對外提供定時任務的添加、刷新、刪除操作void AddTimerTask(uint64_t id, uint32_t delay, const TaskFunc& cb){_timer_wheel.AddTimerTask(id, delay, cb);}void ReflushTimerTask(uint64_t id){_timer_wheel.ReflushTimerTask(id);}void TimerCancel(uint64_t id){_timer_wheel.CancelTimerTask(id);}/* 是否存在某個定時任務,該函數存在線程安全問題,因為如果多線程調用,可能會引發數據不一致問題,所以只能在模塊內,在對應的EventLoop線程內執行*/bool HasTimer(uint64_t id){return _timer_wheel.HasTimer(id);}
};
調用Channel的EnableRead流程:
- 先調用Channel內部的EnableRead
- 然后回調Update函數
- Update內又回調了EventLoop類的成員函數UpdateEvent函數,并傳入channel指針
- EventLoop類的成員函數UpdateEvent函數又回調了成員變量Poller的成員函數UpdateEvent
- 至此我們就將一個fd的監控事件放入了epoll中
實際上是Channel回指了EventLoop,EventLoop組合了Poller,所以Channel可以根據EventLoop找到Poller,間接的調用Poller的接口將Channel管理的事件添加的epoll的監控中
添加一個定時任務?
實際上就是將一個函數bind到定時器超時時執行的回調函數上?
Connection
是對一個通信連接的全方位管理,我們對通信連接的所有操作都是通過這個整合了各個小模塊后的Connection模塊提供的
成員:
- 套接字管理,能夠進行套接字的操作
- 連接事件的就緒后的回調函數,可讀、可寫、錯誤、掛斷、任意事件共五種事件回調函數
- 緩沖區管理,便于socket數據的接收和發送
- 協議上下文的管理,記錄請求數據的處理過程,反序列化
- 回調函數的設置
- 階段處理函數共四個
- 服務端從TCP接收緩沖區收到數據后,數據該如何處理,需要用戶決定,因此必須有業務處理回調函數
- 一個連接建立成功后,用戶可以設定一些行為,因此有連接建立成功的回調函數
- 一個連接關閉前,用戶可以設定一些行為,因此有連接關閉的回調函數
- 任意事件的產生后,用戶可以設定一些行為,因此有任意事件回調函數
功能:
- 發送數據,實際上并不是直接發送數據,而是把數據寫入到發送緩沖區中,然后啟動寫事件監控
- 關閉連接,實際上也不是直接關閉連接,而是在釋放連接之前,看看輸入輸出緩沖區是否還有數據待處理
- 啟動、取消非活躍連接的超時銷毀功能,因此需要一個bool值來表明連接是否啟動非活躍銷毀功能
- 協議切換,一個連接收到數據后該如何進行業務處理,取決于上下文以及數據的業務處理回調函數
Connection是對連接管理的模塊,對連接的所有操作都是通過這個模塊完成的。
場景:當對連接進行多線程操作時,可能會出現連接已經被釋放,但是后續又對連接進行了操作,導致內存訪問已經被釋放的空間,最終導致程序崩潰。
解決方案:使用智能指針shared_ptr對Connection對象進行管理,這樣就能保證任意一個地方對Connection對象操作時,該對象依舊存在,即使其他地方釋放了他的智能指針
實現
注意點:
- 需要組合我們前面編寫的Socket、Buffer、Channel(需要回指EventLoop)模塊,最終實現五個通信連接的事件回調函數?
- 讀回調:從TCP接收緩沖區讀取數據,寫入到我們的inbuffer緩沖區,如果inbuffer中有數據則調用上層設置的業務處理函數
- 寫回調:一旦調用寫回調就意味著可以向TCP發送緩沖區寫數據了,所以我們直接寫入即可,如果寫入失敗了則需要關閉連接,但是在關閉連接之前需要判斷發送緩沖區是否還有數據,如果有數據,則再執行一次業務處理函數,然后進行關閉。如果寫入成功,則判斷outbuffer是否還有數據,如果沒有就取消寫事件監控,如果還處于關閉連接狀態,則關閉連接不在判斷發送緩沖區是否還有數據了。
- 掛斷:判斷一下inbuffer是否還有數據,如果有則調用一次業務處理函數,然后關閉連接。
- 錯誤:直接復用掛斷回調,因為兩者操作相同
- 任意回調:如果關心連接的活躍度則刷新超時任務,然后調用組件使用者提供的任意事件回調函數
- 在構造函數內直接對Channel進行事件回調函數的bind,以達到只要實例化一個Connection對象,該對象就有相應的五個事件回調函數
- 啟動非活躍連接銷毀功能,實際上就是將Connection中的Release函數作為了超時任務
特別注意:?
因為一個從屬Reactor對應一個線程、一個EventLoop,EventLoop中又含有一個獨立的epoll,線程這個執行流會循環式的調用EventLoop的Start函數進行三個操作的無限循環,是串行執行的。當服務器達到了一個瓶頸,在一次業務處理中花費了大量時間,那么這就會導致在他之后的就緒事件直接超時,超時了那么定時器直接執行超時任務Release函數,移除事件監控、關閉套接字,如果有后續的定時任務則取消、調用上層的關閉回調函數(TcpServer中設置的RemoveConnection函數,會deleteConnection對象),所以在執行后面的就緒任務時就會因為Connection對象已經銷毀而導致訪問非法空間導致段錯誤。
雖然是服務器性能造成的一個業務處理事件過長而導致拖累了其他連接,導致其他連接也超過了超時時間,但這是服務器性能的問題,跟計時器無關,計時器只負責超時銷毀,這是一種解耦合。
但是為了避免計時器直接執行超時任務,關閉連接導致本輪epoll后續的就緒事件在執行時Connection對象已經不存在了,所以我們要將Release也封裝為一個任務壓入從屬線程的任務隊列。
所以我們應該能體會為什么ReleaseInLoop中需要取消定時器中該連接對應的超時任務,因為會有這樣一個風險,該連接的銷毀任務已經被壓入任務隊列,但是本輪epoll后續的就緒事件在執行后會刷新定時任務,這會導致下一次秒針走到該定時任務處時,要執行超時任務釋放連接時,Connection對象已經不在了,所以在ReleaseInLoop中我們需要判斷定時器中是否還有我們的任務,如果有則取消。
typedef enum
{Disconnected, // 連接關閉狀態Connecting, // 連接建立成功,待處理狀態Connected, // 連接建立完成,各種設置已完成,可以通信Disconnecting // 待關閉狀態
}ConnStatus;class Connection : public std::enable_shared_from_this<Connection>
{
/*使用智能指針shared_ptr對Connection對象進行管理,這樣就能保證任意一個地方對*/
/*Connection對象操作時,該對象依舊存在,即使其他地方釋放了他的智能指針*/using ConnectionCallback = std::function<void(const std::shared_ptr<Connection>&)>;using MessageCallback = std::function<void(const std::shared_ptr<Connection>&, Buffer*)>;using CloseCallback = std::function<void(const std::shared_ptr<Connection>&)>;using AnyEventCallback = std::function<void(const std::shared_ptr<Connection>&)>;
private:uint64_t _conn_id; // 唯一標識一個連接的IDint _sockfd; // 連接對應的fdbool _is_inactive_release; // 連接是否啟動非活躍銷毀標志位,默認false, 因為上層可能有長連接的需求ConnStatus _status; // 連接處于何種狀態EventLoop* _loop; // 回指EventLoop,讓所有操作都在loop對應的線程上進行操作,避免線程安全問題Channel _channel; // 對連接的事件管理Socket _socket; // 套接字管理Any _context; // 上層是何種協議Buffer _inbuffer; // 輸入緩沖區Buffer _outbuffer; // 輸出緩沖區// 某個階段的回調函數,是上層需要的業務處理函數,是上層在服務器模塊設置的ConnectionCallback _conn_cb;// 連接建立后,上層需要的業務處理函數MessageCallback _msg_cb; // 接收數據后,上層需要的業務處理函數CloseCallback _close_cb; // 連接關閉后,上層需要的業務處理函數AnyEventCallback _event_cb; // 任意事件觸發,上層需要的業務處理函數// 因為在后面的服務器模塊會使用shared_ptr保存所有連接記錄,所以一旦有連接需要關閉// 那么就應該從管理的地方移除自己的信息,類似計時器模塊那里的TimerTask的Release回調CloseCallback _server_close_cb;
private:// 連接事件就緒后,回調的五個函數void HandlerRead(){ char buffer[65536];ssize_t n = _socket.NonBlockRecv(buffer, 65535);if (n < 0){// 返回值小于0,表明一定是出現了連接大問題,沒必要保留連接了// 但是不能直接關閉連接,因為inbuffer、outbuffer可能還有數據,需要處理完再關閉連接return ShutdownInLoop();}// 放入緩沖區_inbuffer.Write(buffer, n);// 緩沖區有數據,則調用MessageCallback進行業務處理if (_inbuffer.ReadAbleSize() > 0){// shared_from_this -- 從當前對象獲取自身的shared_ptr,需要繼承一個模板類return _msg_cb(shared_from_this(), &_inbuffer);}}// 觸發寫事件void HandlerWrite(){ssize_t n = _socket.NonBlockSend(_outbuffer.ReadPosition(), _outbuffer.ReadAbleSize());if (n < 0){// 返回值小于0,表明一定是出現了連接大問題,沒必要保留連接了,// 最后再看一次inbuffer是否還有數據if (_inbuffer.ReadAbleSize() > 0)_msg_cb(shared_from_this(), &_inbuffer);// 進行實際的關閉,不再判斷發送緩沖區是否還有數據了return ReleaseInLoop();}// outbuffer的讀指針移動_outbuffer.MoveReadPos(n);if (_outbuffer.ReadAbleSize() == 0){// 如果outbuffer沒有數據了,那就關閉寫事件監控_channel.DisableWrite();// 如果此時還是連接關閉狀態,那把鏈接也關閉了if (_status == Disconnecting)return Release();}}// 觸發掛斷事件void HandlerClose(){// 一旦連接掛斷了,套接字什么都干不了了,因此有數據待處理就處理一下if (_inbuffer.ReadAbleSize() > 0)_msg_cb(shared_from_this(), &_inbuffer);// 進行實際的關閉,不再判斷發送緩沖區是否還有數據了Release();}// 觸發出錯事件void HandlerError(){HandlerClose();}// 觸發任意事件:1. 刷新連接的活躍度 2. 調用組件使用者提供的回調函數void HandlerEvent(){// 如果關心連接活躍度則刷新活躍度if (_is_inactive_release)_loop->ReflushTimerTask(_conn_id);if (_event_cb) _event_cb(shared_from_this());}/*為了線程安全,所以將對連接的所有操作都封裝為一個任務,*/
/*push到loop的任務隊列中,讓loop對應的線程串行的執行任務*/void EstablishedInLoop(){// 1. 修改連接狀態, 當前函數執行完則連接進入已完成連接狀態assert(_status == Connecting);_status = Connected;// 2. 啟動讀事件監控_channel.EnableRead();// 3. 調用回調函數if (_conn_cb) _conn_cb(shared_from_this());}void ReleaseInLoop(){// 1. 修改連接狀態_status = Disconnected;// 2. 移除連接的事件監控_channel.Remove();// 3. 關閉描述_socket.Close();// 4. 如果定時器中還有我們的定時銷毀任務,則取消任務if (_loop->HasTimer(_conn_id))CancelInactiveReleaseInLoop();// 5. 調用關閉回調函數,如果先調用移除服務器管理的回調函數,那么再去調用_close_cb會訪問已經釋放的空間,所以先調用關閉回調函數if (_close_cb) _close_cb(shared_from_this());if (_server_close_cb) _server_close_cb(shared_from_this());}// 將數據放到緩沖區,啟動可寫事件監控void SendInLoop(Buffer& buffer) {if (_status == Disconnected) return;_outbuffer.Write(buffer);if (!_channel.WriteAble()) _channel.EnableWrite();}// 并非實際的連接釋放操作,需要判斷緩沖區是否還有殘留void ShutdownInLoop(){// 設置為半關閉狀態_status = Disconnecting;if (_inbuffer.ReadAbleSize() > 0){if (_msg_cb) _msg_cb(shared_from_this(), &_inbuffer);}// 要么寫入數據的時候出錯關閉,要么就是沒有待發送的數據,直接關閉if (_outbuffer.ReadAbleSize() > 0){if (!_channel.WriteAble())_channel.EnableWrite();}if (_outbuffer.ReadAbleSize() == 0)Release();}void EnableInactiveReleaseInLoop(int time) {// 1. 將_is_inactive_release 置為 true_is_inactive_release = true;// 2. 如果當前定時銷毀任務已經存在,則刷新任務if (_loop->HasTimer(_conn_id))_loop->ReflushTimerTask(_conn_id);// 3. 如果不存在,則新增定時任務else_loop->AddTimerTask(_conn_id, time, std::bind(&Connection::Release, this));}void CancelInactiveReleaseInLoop() {_is_inactive_release = false;if (_loop->HasTimer(_conn_id))_loop->TimerCancel(_conn_id);}void UpgradeInLoop(const Any& context, const ConnectionCallback& conn_cb, const MessageCallback& msg_cb, const CloseCallback& close_cb, const AnyEventCallback& event_cb){_context = context;_conn_cb = conn_cb;_msg_cb = msg_cb;_close_cb = close_cb;_event_cb = event_cb;}
public:Connection(EventLoop* loop, uint64_t conn_id, int sockfd) : _sockfd(sockfd), _socket(sockfd), _conn_id(conn_id), _loop(loop), _is_inactive_release(false), _status(Connecting) // 雖然內核三次握手了,但是我們還沒有對連接進行各種回調設置,所以我們任務處于連接建立狀態, _channel(loop, _sockfd){ _channel.SetReadCallback(std::bind(&Connection::HandlerRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandlerWrite, this));_channel.SetCloseCallback(std::bind(&Connection::HandlerClose, this));_channel.SetErrorCallback(std::bind(&Connection::HandlerError, this));_channel.SetEventCallback(std::bind(&Connection::HandlerEvent, this));// 不能在這里直接監控可讀事件,會因為定時任務出問題}~Connection() { DBG_LOG("release connection: %p", this); }int Fd() { return _sockfd; }uint64_t id() { return _conn_id; }// 連接是否處于Connected狀態bool status() { return _status == Connected; }// 設置、獲取上下文void SetContext(const Any& context) { _context = context; }Any* GetContext() { return &_context; }// 設置階段的回調函數void SetConnectedCallback(const ConnectionCallback& cb) { _conn_cb = cb; }void SetMessageCallback(const MessageCallback& cb) { _msg_cb = cb; }void SetClosedCallback(const CloseCallback& cb) { _close_cb = cb; }void SetAnyEventCallback(const AnyEventCallback& cb) { _event_cb = cb; }void SetSvrClosedCallback(const AnyEventCallback& cb) { _server_close_cb = cb; }// 連接獲取后,對連接的各種設置回調、啟動事件監控void Established(){_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));}// 將數據寫入發送緩沖區,然后啟動寫事件監聽void Send(const char* data, size_t len) {// 因為外界傳入的data可能是一個臨時空間,而我們將send任務push到了任務池,該任務在執行前data空間可能就已經被銷毀了// 因此我們需要構建一個變量保存data數據Buffer buf;buf.Write(data, len);_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));}// 提供給組件使用者的接口,并不是真正關閉連接,需要檢查緩沖區是否還有數據void Shutdown(){_loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));}void Release() {_loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this));}// 啟動非活躍連接的超時自動銷毀,傳入超時時間timevoid EnableInactiveRelease(int time) {_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, time));}// 取消非活躍連接的超時自動銷毀void CancelInactiveRelease() {_loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));}// 協議切換 --> 重置上下文、階段回調函數 --- 非線程安全void Upgrade(const Any& context, const ConnectionCallback& conn_cb, const MessageCallback& msg_cb, const CloseCallback& close_cb, const AnyEventCallback& event_cb){_loop->AssertInLoop();_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn_cb, msg_cb, close_cb, event_cb));}
};
Accept
這個模塊的意義主要是對于監聽套接字進行管理,主要的功能是:
- 創建一個監聽套接字
- 啟動讀事件監控
- 事件觸發后,獲取新連接
- 調用新連接獲取成功后的回調函數
- 為新連接創建Connection進行管理
對于新連接如何處理應該是服務器模塊來管理的,即設置給listenfd的讀回調函數用于處理新連接。因為主Reactor是專門負責獲取新連接的,一個Reactor對應一個EventLoop,所以主線程作為主Reactor的專屬線程,必然需要一個EventLoop對象支持listen的各種操作,所以需要外賣傳入EventLoop指針
class Acceptor
{using AcceptCallback = std::function<void(int)>;
private:Socket _socket; // 監聽套接字管理EventLoop* _loop; // 對監聽套接字進行事件監控管理Channel _channel; // 監聽套接字事件管理AcceptCallback _accept_cb; // 監聽到新連接時的回調函數
private:// 監聽到新連接時的讀事件回調函數void HandlerRead(){int newfd = _socket.Accept();if (newfd < 0) return;if (_accept_cb) _accept_cb(newfd);}int CreateServer(int port){bool ret = _socket.CreateServer(port);assert(ret == true);return _socket.Fd();}
public:/*注意:啟動讀事件監控不能放在構造函數內!因為如果構造函數啟動了讀事件監控之后,立馬來了一個連接*//*那么此時我們的AcceptCallback還沒有設置,所以調用HandlerRead時不會調用_accept_cb,這樣就會導致*//*新連接得不到服務,并且newfd泄露!*/Acceptor(EventLoop* loop, uint16_t port): _loop(loop), _channel(loop, _socket.Fd()), _socket(CreateServer(port)){_channel.SetReadCallback(std::bind(&Acceptor::HandlerRead, this));}void SetAcceptCallback(const AcceptCallback& cb) { _accept_cb = cb; }void Listen() { _channel.EnableRead(); }
};
LoopThread
該模塊的功能主要是來把EventLoop模塊和線程結合在一起,要形成的最終效果是,讓EventLoop和線程是一一對應的。在EventLoop模塊實例化的對象,在構造的時候就會初始化線程的id,而當后面需要運行一個操作的時候,就判斷是否運行在EventLoop模塊對應的線程中,如果是就代表是一個線程,不是就代表當前運行的線程不是EventLoop線程
不能直接在主線程內實例化三個EventLoop,因為此時EventLoop內的thread_id都會被設置為主線程的ID,所以必須在每個新線程內部進行實例化EventLoop,并且在構造EventLoop對象到設置新的線程id這個期間是不可控的
所以就要構造一個新的模塊,LoopThread,這個模塊的意義就是把EventLoop和線程放到一塊,主要的思路就是創建線程,在線程中實例化一個EventLoop對象,這樣可以向外部返回一個實例化的EventLoop
流程:
- 創建線程
- 在線程中實例化EventLoop對象
接口:?
- 返回實例化的EventLoop?
實現?
class LoopThread
{
private:std::thread _thread; // 一個EventLoop對應一個線程EventLoop* _loop; // 從屬Reactor管理所有fd事件std::mutex _mutex; // 為了保證獲取_loop之前,loop已經被初始化,所以需要線程鎖std::condition_variable _cond; // 線程同步控制獲取已經初始化的_loop
private:// 線程函數執行例程void ThreadEntry(){// 實力化EventLoopEventLoop loop;{// 初始化EventLoop的時候,就別打斷,所以加鎖std::unique_lock<std::mutex> lock(_mutex);_loop = &loop;// 喚醒可能早已經調用GetLoop而等待的執行流_cond.notify_all();}loop.Start();}
public:LoopThread():_thread(std::thread(&LoopThread::ThreadEntry, this)), _loop(nullptr){}// 外部獲取EventLoop*EventLoop* GetLoop(){EventLoop* loop = nullptr;{std::unique_lock<std::mutex> lock(_mutex);// 如果_loop為空,則等待_cond.wait(lock, [&](){ return _loop != NULL; });//loop為NULL就一直阻塞loop = _loop;}return loop;}};
?
細節:?
- 在線程內初始化EventLoop
- GetLoop函數,在外部可以獲取到EventLoop對象,方便新連接指向EventLoop
- 有可能線程創建了,但是EventLoop還沒有實例化,但是外部調用了GetLoop函數,此時會獲得nullptr,這就表明該連接會得不到服務,所以我們需要加鎖,在獲取Loop之前必須等待初始化。
- 我們不使用new創建堆上的EventLoop,而是在線程執行的例程中實例化一個局部對象,這樣就會使EventLoop生命周期與ThreadLoop綁定,因為在線程執行的例程中,我們會執行loop的start函數,這個start是一個死循環,一旦出錯了那么就是從屬reactor取消了,相應的EventLoop也應該銷毀
LoopThreadPool
主Reactor就是我們的主線程,只負責連接的獲取。從屬Reactor負責新連接的事件監控及處理。
設計思想:
- 針對LoopThread設計一個線程池,便于使用者對線程的控制
- 對所有的LoopThread進行管理及分配
功能:
- 線程數量可配置
- 根據用戶傳遞的參數決定線程的數量,因此從屬線程數量可能為0,也就是單Reactor模式的服務器,此時表示服務器是輕量級的,主線程既負責獲取連接,又負責事件的處理。
- 對所有線程進行管理,就是管理 【0,n】 個 LoopThread 對象
- 提供線程分配的功能
- 如果有0個從屬線程,則直接分配給主線程的EventLoop進行處理。
- 如果有n個從屬線程,則采用輪轉的思想,對新連接Connection進行從屬Reactor線程分配,就是獲取從LoopThread中獲取輪轉到的線程的EventLoop地址,將其設置給新連接的Connection。
?實現
class LoopThreadPool
{
private:int _thread_count; // 從屬線程的數量int _next_idx; // 用于控制輪轉的下標EventLoop* _baseloop; // 當從屬線程數量大于0,則baseloop只負責獲取新連接。如果等于0,則既負責獲取新連接,又負責事件處理std::vector<LoopThread*> _threads;// 保存各個從屬線程std::vector<EventLoop*> _loops; // 保存各個線程的EventLoop指針
public:LoopThreadPool(EventLoop* baseloop): _baseloop(baseloop), _thread_count(0), _next_idx(0){}void SetCount(int count) { _thread_count = count; }void Create(){if (_thread_count > 0){_threads.resize(_thread_count);_loops.resize(_thread_count);for (int i = 0; i < _thread_count; ++i){// 創建線程后,在線程執行的例程內部實例化EventLoop,所以如果阻塞了,那么不會執行到下一條語句獲取一個空的EventLoop_threads[i] = new LoopThread();_loops[i] = _threads[i]->GetLoop();}}}EventLoop* NextLoop(){if (_thread_count == 0)return _baseloop;_next_idx = (_next_idx + 1) % _thread_count;return _loops[_next_idx];}
};
?
TcpServer
整合所有模塊,方便上層直接使用。不需要用戶手動創建EventLoop對象baseloop,也不需要創建LoopThreadPool,只需要傳入端口號、從屬線程池的數量
管理:
- Acceptor對象,創建一個監聽套接字
- EventLoop對象,即baseloop,實現對監聽套接字的事件監控
- std::unordered_map<uint64_t, std::shared_ptr<Connection>> _conns,實現對所有新建連接的管理
- LoopThreadPool對象,創建loop線程池,對新建連接輪轉分配EventLoop,實現從屬Reactor的負載均衡
功能:
- 設置從屬線程池中線程的數量
- 啟動服務器
- 設置各種階段回調函數(連接建立完成、業務處理、關閉、任意),是用戶設置給TcpServer,TcpServer再設置給新獲取連接的Connection
- 是否啟動非活躍連接超時銷毀功能
- 添加定時任務功能?
?流程:
- 在TcpServer中實例化一個Acceptor對象,以及一個EventLoop對象(baseloop)
- 將Acceptor掛到baseloop上進行事件監控
- 一旦Acceptor對象即listenfd讀事件就緒,則執行讀事件回調函數獲取新連接
- 對新連接創建一個Connection對象進行統籌管理
- 將新連接對應的Connection掛到LoopThreadPool返回的EventLoop上進行事件監控管理
- 一旦Connection對應的連接就緒了可讀事件,則執行可讀事件的回調函數,讀取數據然后再調傭TcpServer設置的階段回調函數
?實現
class TcpServer
{
private:uint64_t _conn_id; // 唯一標識連接的id,自動增長 int _timeout; // 非活躍連接的超時時間bool _is_inactive_release; // 是否啟動非活躍銷毀功能int _port; // 服務器要綁定的端口號EventLoop _baseloop; // 監聽套接字對應的EventLoopAcceptor _acceptor; // 監聽套接字管理LoopThreadPool _threadpool; // 從屬線程管理std::unordered_map<uint64_t, std::shared_ptr<Connection>> _conns; // shared_ptr保存所有連接using ConnectedCallback = std::function<void(const std::shared_ptr<Connection>&)>;using MessageCallback = std::function<void(const std::shared_ptr<Connection>&, Buffer*)>;using CloseCallback = std::function<void(const std::shared_ptr<Connection>&)>;using AnyEventCallback = std::function<void(const std::shared_ptr<Connection>&)>;ConnectedCallback _conn_cb;// 連接建立后,上層需要的業務處理函數MessageCallback _msg_cb; // 接收數據后,上層需要的業務處理函數CloseCallback _close_cb; // 連接關閉后,上層需要的業務處理函數AnyEventCallback _event_cb; // 任意事件觸發,上層需要的業務處理函數
private:// 監聽套接字的讀事件就緒回調void NewConnection(int fd){_conn_id++;std::shared_ptr<Connection> conn(new Connection(_threadpool.NextLoop(), _conn_id, fd));conn->SetConnectedCallback(_conn_cb); // 連接建立成功時,上層業務處理的回調函數conn->SetMessageCallback(_msg_cb); // 緩沖區有數據就緒,上層業務處理的回調函數conn->SetClosedCallback(_close_cb); // 連接關閉時,需要將_conns中對Connection記錄shared_ptr刪除的的回調函數conn->SetAnyEventCallback(_event_cb);conn->SetSvrClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));// 是否啟動非活躍銷毀if (_is_inactive_release) conn->EnableInactiveRelease(_timeout);// 就緒初始化conn->Established();_conns.insert(std::make_pair(_conn_id, conn));}void RemoveConnectionInLoop(const std::shared_ptr<Connection>& conn){if (!conn) return;auto it = _conns.find(conn->id());if (it != _conns.end()) {_conns.erase(it);}}void RunAfterInLoop(const TaskFunc& task, int delay){_conn_id++;_baseloop.AddTimerTask(_conn_id, delay, task);}// 從_connections中移除對應鏈接的shared_ptrvoid RemoveConnection(const std::shared_ptr<Connection>& conn){_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));}
public:TcpServer(int port): _port(port), _conn_id(0), _timeout(0), _is_inactive_release(false), _acceptor(&_baseloop, port), _threadpool(&_baseloop){// 不能直接在這里Create,因為我們還沒有設置threadcount,所以默認為0,返回EventLoop時會出錯,應該放在Start中// _threadpool.Create(); // 創建線程池中的從屬線程_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));_acceptor.Listen(); // 將監聽套接字掛到baseloop上}// 設置從屬線程數量void SetThreadCount(int count) { _threadpool.SetCount(count); }// 用于啟動非活躍連接的銷毀功能void EnableInactiveRelease(int timeout){_is_inactive_release = true;_timeout = timeout;}// 用于添加一個定時任務void RunAfter(const TaskFunc& cb, int delay){_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, cb, delay));}void Start() {// 注意!!!在沒有設置threadcount前,不能調用Create_threadpool.Create();_baseloop.Start();}void SetConnectedCallback(const ConnectedCallback& cb) { _conn_cb = cb; }void SetMessageCallback(const MessageCallback& cb) { _msg_cb = cb; }void SetClosedCallback(const CloseCallback& cb) { _close_cb = cb; }void SetAnyEventCallback(const AnyEventCallback& cb) { _event_cb = cb; }
};
EchoServer
我們已經完成了所有的模塊實現,現在通過搭建一個簡單的回響服務器來觀察整體效果
EchoServer.hpp?
#include "../Server.hpp"class EchoServer
{
private:TcpServer _tcpsvr;
private:void OnConnected(const std::shared_ptr<Connection>& conn){DBG_LOG("new connection: %p", conn.get());}void OnMessage(const std::shared_ptr<Connection>& conn, Buffer *buf){if (!conn || !buf) return;if (buf->ReadAbleSize() == 0) return;DBG_LOG("%s", buf->ReadPosition());conn->Send(buf->ReadPosition(), buf->ReadAbleSize());buf->MoveReadPos(buf->ReadAbleSize());conn->Shutdown();}void OnClosed(const std::shared_ptr<Connection>& conn){DBG_LOG("Connection Destory");}
public:EchoServer(int port): _tcpsvr(port){_tcpsvr.SetThreadCount(3);_tcpsvr.EnableInactiveRelease(10);_tcpsvr.SetConnectedCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1));_tcpsvr.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));_tcpsvr.SetClosedCallback(std::bind(&EchoServer::OnClosed, this, std::placeholders::_1));}void Start(){_tcpsvr.Start();}
};
main.cc
#include "echo.hpp"int main()
{EchoServer echo(8080);echo.Start();return 0;
}
client.cc
#include "../Server.hpp"int main()
{Socket client;client.CreateClient(8080, "127.0.0.1");int cnt = 5;while (cnt--){// 發std::string str = "hello Linux";client.Send(str.c_str(), str.size());// 收char buffer[1024] = {0};client.Recv(buffer, sizeof(buffer) - 1);DBG_LOG("%s", buffer);sleep(1);}while(true) sleep(1);return 0;
}
?
?測試
可以看到客戶端發送了5次數據后停止發送,服務端收到了五次數據然后回顯五次,客戶端也收到并打印了出來,并且在超時時間10s過后服務端主動關閉了連接。
性能測試
WebBench工具
WebBench是一個輕量級的網站壓力測試工具,由紅帽公司開發并維護。它主要用于評估HTTP服務器在高負載情況下的性能表現。通過模擬大量的并發用戶訪問請求,可以對目標站點的壓力承受能力、響應時間和吞吐量等指標進行全面分析。
WebBench通過建立大量的客戶端到服務器端之間的并行TCP連接來發起請求。這模仿了真實環境中眾多用戶的同時在線場景。
一旦建立了足夠的連接數之后,WebBench將開始不斷地向指定的目標URL地址發出GET或其他類型的HTTP查詢指令(取決于配置設定),直到達到預設的最大次數或持續時間為止。
測試
由于云服務器的帶寬很低,所以如果是不同云服務器之間進行高并發請求,那么效率很低,所以我們使用了本地環回,本地請求,這樣就忽略了帶寬影響,但是又有一個問題,本地服務器既響應,又多進程請求,他們會爭奪CPU,會降低服務器效率,也不合理,所以我們分別在云服務器上測試一次,在虛擬機上測試一次
云服務器配置為2核1G,我們在云服務器上同時運行服務端與webbench進行4000并發量測試
虛擬機配置為4核4G,我們在虛擬機上同時運行服務端與webbench進行5000并發量測試
HTTPServer
對于這個模塊,我將會在另一篇文章中講解并測試