目錄
設計思路
類的設計
模塊的實現
私有接口
公有接口?
設計思路
我們說過一個EventLoop要綁定一個線程,未來該EventLoop所管理的所有的連接的操作都需要在這個EventLoop綁定的線程中進行,所以我們該如何實現將EventLoop和線程綁定呢?
按照我們前面實現的EventLoop的邏輯,構造函數的時候,_thread_id(std::this_thread::get_id()) 也就是EventLoop構造時就直接綁定了當前線程了,那么我們就需要先創建線程,然后在線程的入口函數中來創建一個EventLoop對象,這樣我們的EventLoop對象就和一個線程綁定了,創建完之后它可以將這個EventLoop的指針返回給TcpServer用來分配給新連接進行關聯。
那么我們可不可以先創建一批EventLoop對象再來綁定線程呢?
從技術的角度來說當然可以,我們只需要設置一個接口用來設置EventLoop的_thread_id就行了,但是這樣做會存在一個問題:當我們創建一批EventLoop的時候,再創建一批線程來進行綁定之前,是有一個時間窗口?的,在這個時間窗口中,如果有新連接到來,且有新事件到來,那么就會出現問題。
就好比一家繁忙的餐廳,你是經理需要分配服務員(線程)負責特定的餐桌區域(EventLoop)。正確的做法是:先雇傭服務員,培訓他們,然后才開門營業接待客人。但如果你決定先開門營業,讓餐桌區域準備好接待客人,然后才開始招聘和分配服務員,就會出現危險的時間窗口:
客人已經入座點餐(連接已建立),甚至食物已經準備好(事件已到來),但沒有服務員知道這些餐桌是他們負責的!結果就是客人坐在那里等待,沒人來服務他們,食物在廚房里變冷,而餐廳陷入混亂。
也就是我們的_thread_id還沒有切換到我們想要的線程id上,還在創建EventLoop的線程中,那么這時候操作就是由這個創建EventLoop的線程執行了,而后續切換線程之后,又會由新的線程來執行操作,那么就會出現一個連接的所有操作并不在一個線程中全部完成,可能會出現在多個線程中執行的情況,
再想象這樣一個場景:餐廳開始營業,最初由經理(創建EventLoop的線程)臨時擔任服務員的角色,開始接待客人和處理訂單。然后在客人就餐過程中,經理突然告訴新來的服務員:"從現在開始,這些桌子由你負責了",然后經理離開去做其他工作。
這會導致嚴重的混亂:
- 服務員不知道這些客人已經點了什么菜
- 不清楚客人的特殊要求或過敏信息
- 不了解客人的用餐進度
- 甚至可能重復上菜或忘記上某些菜品
客人的體驗會非常糟糕,因為他們的服務被分割在兩個不同的服務人員之間,沒有連貫性和一致性。
這是我們不想看到的,我們要確保一個連接的所有操作都在一個線程中執行,所以我們不能采取這種方案。
那么我們的方案:就只能是先創建線程再創建EventLop對象,后續會通過特定的方式返回這個EventLoop的指針交給TcpServer進行分配。
為了方便操作,我們可以設置一個新的模塊,也就是EventLoopThread模塊,專門用于創建一個線程并創建綁定一個EventLoop。
它內部有兩個成員,一個就是我們的線程對象,另一個就是我們的EventLoop的指針。
所以我們要設置線程入口函數,然后在線程入口函數中創建一個EventLoop對象,創建完之后執行EventLoop的Start來完成事件的循環。
但是這時候就會有一個問題,因為線程的創建到創建好一個EventLoop對象并設置指針變量是有一個時間窗口的,同時,在設置我們的指針成員的同時,有可能由其他的線程需要獲取這個指針,那么就會出現讀寫并發的情況,或者說這個指針變量會有線程安全問題。
我們還拿餐廳的例子進行加以理解
想象餐廳正在準備開業,經理在招聘并培訓新的服務員。每個服務員需要熟悉自己的工作區域、學習餐廳系統并拿到自己的工作牌(相當于創建EventLoop并設置線程ID的過程)。
這時候,另一位經理(其他線程)已經開始在前臺接待客人,并試圖將客人分配給"正在培訓中"的服務員。但問題是,這些服務員可能還沒有完成培訓,沒有拿到工作牌,甚至可能還沒有被正式雇傭!
那么我們需要保護這個指針指針變量的互斥與同步訪問,需要使用一個互斥鎖和一個條件變量來保證指針的安全。
在餐廳中,解決方案也是使用一個協調板和一個明確的流程:
- 招聘和培訓完全結束后,才將服務員的信息放到協調板上
- 前臺經理只查看協調板上已確認可用的服務員
- 使用一個信號系統(如專門的管理員)確保協調板的更新和查看不會同時發生
類的設計
綜上我們知道了EventLoop的流程
- 首先,EventLoopThread對象被創建(可能在主線程或其他線程中)
- 當調用EventLoopThread的構造方法時,它會創建一個新的線程
- 這個新線程開始執行StartRoutine()函數
- 在StartRoutine()函數內部,線程創建一個新的EventLoop對象
- 線程將這個EventLoop對象的指針安全地賦值給共享變量_loop
- 通知等待的線程EventLoop已經創建完成
- 開始運行EventLoop的事件循環
那么EventLoopThread類的成員如下:
class EventLoopThread
{
private:EventLoop* _loop;std::thread _thread;std::mutex _mutex; //保護_loop安全std::condition_variable _cond; //實現同步
private:
//線程的入口函數void StartRoutine();
public: EventLoopThread(){}//提供一個接口用于獲取內部的EventLoop//意味著這個_loop會被多個線程競爭,那么需要鎖和條件變量來實現同步互斥//因為未來線程剛創建的時候,在還沒有創建好EventLoop對象的時候,這時候就可能會被主線程或者其他線程來獲取Loop了,那么這時候是線程不安全的,所以需要加鎖保護//同時,為了防止線程中的EventLoop對象還沒創建就有線程來獲取,我們需要再使用一個條件變量。 申請到鎖之后,如果條件不滿足,線程還需要在條件變量下等待,直到條件滿足再來競爭鎖并獲取鎖EventLoop* GetEventLoop();
};
模塊的實現
入口函數很簡單,無非就是加鎖創建完EventLoop對象之后,喚醒在條件變量下等待的線程,然后就開始執行EventLoop的Start循環邏輯。
私有接口
//線程的入口函數void StartRoutine(){//加鎖創建對象EventLoop* loop = new EventLoop();{std::unique_lock<std::mutex> lock(_mutex);_loop = loop;}//喚醒條件變量下的線程_cond.notify_all();//啟動EventLoop循環loop -> Start();}
公有接口?
然后就是構造函數,無非就是初始化_loop和設置thread的入口函數
EventLoopThread():_loop(nullptr),_thread(std::bind(&EventLoopThread::StartRoutine, this)) // 創建一個新線程,并指定StartRoutine作為線程的入口函數{}
剩下的就是一個獲取EventLoop的接口了,其實無非就是加鎖和條件變量下等待這兩個步驟:
EventLoop* GetEventLoop(){EventLoop* ret = nullptr;{std::unique_lock<std::mutex> lock(_mutex); //加鎖_cond.wait(lock,[&](){return _loop!=nullptr;}); //判斷函數返回值為真//走到這里說明被喚醒了ret = _loop;}return ret;}