仿muduo庫實現并發服務器
- 1.LoopThread模塊
- 1.1成員變量
- 1.2構造函數
- 13線程入口函數
- 1.4獲取eventloop對象GetLoop()
- 2.LoopThreadPool模塊
- 2.1成員變量
- 2.2構造函數
- 2.3配置線程數量
- 2.4按照配置數量創建線程
- 2.5依次分配Eventloop對象
1.LoopThread模塊
這個模塊是為了將EventLoop與線程整合起來。一個EventLoop對應一個線程。
要知道EventLoop對象實例化時就會將它的thread_id綁定,然后后續的RuninLoop操作里面就會判斷當前執行流的線程id與eventloop對應的線程id進行比較,如果相同就說明是同一個線程,如果不相同,就說明不是同一個線程執行。
這里要注意的是:
eventloop對象的實例化必須在線程創建之后,也就是在線程的入口函數中進行創建,然后eventloop對象一創建出來就綁定該線程。
就是構造一個新的模塊,它在里邊既有線程又有eventloop,而且它是先有線程然后在線程里邊再去實例化eventlop,這樣就能夠保證eventloop在出現的第一時刻就跟我們的一個線程是綁定到一起的。
【獲取eventloop對象】
這個模塊,它將線程跟eventloop給整合到一起了,那么問題就來了,那當我們外界說一個新連接到來了,現在要給它分配一個eventloop對象,怎么給它分配呢?我總得能夠獲取到這個evetloop對象吧,所以在這個里邊我們需要提供一個接口叫做GetLoop,獲取到eventloop對象的指針。
那么當外界可以獲取到這個eventloop對象指針之后,當新連接到來時,就可以將該線程的eventloop分配給該線程,該連接它就綁定在該eventloop對應的線程上工作。
比如在執行各種操作的時候,進行事件監控的時候,它就會綁定到我們eventloop對應的線程里邊去,要進行某些操作的時候也是把任務壓入到我們對eventloop,它的任務隊列里邊。
【獲取evnetloop同步問題】
要避免當我們的線程剛創建起來之后,還沒來得及實例化創建eventloop該對象的時候,這時候外界如果要獲取該線程的eventloop,調用Getloop操作時,獲取到的就為空。所以這里存在一個同步問題,也就是從屬線程要先實例化eventloop對象,主線程才能夠獲取eventloop對象。
所以這里面需要兩個東西:鎖與條件變量。
這2個東西,它的一個功能就是用于那么實現我們的一個loop操作的一個同步關系。因為呢?我們要避免那么線程創建之后,但是我們的eventloop,還沒有被實例化之前就去獲取loop的操作,這樣的獲取是會出問題,就是一個空了對不對所以要給它同步管理起來,也就是說你獲取loop的時候,必須是它loop已經那么實例化完了才可以之間的,你如果沒有實例化完那就不讓你獲取。
也就是在獲取eventloop對象指針時,設置一個條件:eventloop對象指針不為空。
如果這個時候不滿足,則主線程就去條件變量下阻塞等待,如果滿足條件就會被喚醒,去獲取eventloop對象指針。
而在線程的入口函數里面,在剛創建實例化eventloop對象之后,就立即發送通知,喚醒主線程。
1.1成員變量
private:std::thread _thread; // 用來創建線程,eventloop綁定的線程EventLoop *_loop; // Eventloop的指針變量,這里不能實例化,必須在創建線程里面實例化eventloop// 下面兩個用來保證loop的同步性,必須先實例化完loop,才能獲取loopstd::mutex _mutex; // 鎖,用來保護loop資源std::condition_variable _cond; // 條件變量
1.2構造函數
在構造中,就使用C++中的線程庫,創建線程。
所以該模塊對象一旦創建出來,就代表創建了一個線程以及對應的eventloop對象。
public:LoopThread() : _loop(NULL), _thread(std::bind(&LoopThread::ThreadEntry, this)){}
13線程入口函數
該線程一旦創建出來,就要執行的任務就是:
1.創建eventloop對象
2.喚醒主線程
3.啟動eventloop
(eventloop一旦啟動它就會循環監控在它上面的描述符的各種事件是否就緒,一旦就緒就會獲取就緒的事件以及描述符,執行對應的回調函數,并且執行任務隊列中的任務)
private:// 線程的入口函數void ThreadEntry(){// 首先創建對應的Eventloop對象EventLoop loop;_loop = &loop;// 創建完就喚醒可能阻塞的主線程_cond.notify_all();// 該線程就開始啟動工作:_loop->Start();// 1.進行事件監控,并獲取就緒的事件和fd 2.進行就緒事件處理3.執行任務隊列里的任務}
1.4獲取eventloop對象GetLoop()
// 獲取該線程對應的loop是由主線執行的,所以存在線程安全問題,并且還要保證loop的同步問題EventLoop *GetLoop(){EventLoop *loop = NULL; // 為了保證臨界資源_loop的安全性,在訪問時最后將臨界資源拷貝過去,最后返回的時候就不涉及臨界資源的訪問{std::unique_lock<std::mutex> _lock(_mutex); // 加鎖// 確保獲取_loop之前已經被實例化_cond.wait(_lock, [&](){ return _loop != NULL; }); // 如果這時_loop為空,則主線程就一直阻塞在條件變量的隊列中loop = _loop;}return loop;}
實現了我們的一個主從react模型,主線程只負責對監聽套件字的處理,獲取新連接
保證我們連接的一個獲取效率不會因為我們的一個業務的處理而導致無法去獲取連接
而我們的從屬react呢?當它獲取到新連接之后,將我們的新連接分配給我們的從屬react線程中,讓從屬線程對它進行一個事件監控,進行事件監控然后事件的處理以及業務的處理。
2.LoopThreadPool模塊
LoopThreadPool線程池是對所有的LoopThread線程進行管理及分配。
主要提供的功能是:
1.能夠讓用戶設置創建幾個線程。
不過要注意當用戶設置的從線程數據為0時,這時候新連接的事件監控就必須要有主線程中的baseloop進行監控管理。所以該線程池中必須要傳入主線程的baseloop對象,以防止從屬線程的數量為0。
在主從Reactor模型中,主線程只負責獲取新連接,而從屬線程負責對連接進行監控和處理,當線程池中線程的個數為0時,就變成了單Reactor服務器,主線程既負責或者新連接,也負責新連接的監控與處理。
2.能夠將線程池中對應線程的eventloop對象分配出去。
如果線程池中線程的個數為0 ,則將主線程的Eventloop分配給新連接,進行監控與處理。
如果線程池中線程的個數有多個,則采用RR輪轉思想,進行線程的分配。(就是將對應線程的Eventloop對象獲取到然后分配給Connection)
將所有線程的Eventloop對象都存儲起來,然后依次分配出去即可。
2.1成員變量
private:int _thread_count; // 要創建的線程的數量int _loop_idx; // 要分配的loop下標EventLoop *_baseloop; // 主線程的loop,外界傳入進來,如果要創建的從線程數據為0,那么就分配出去的就是主線程的loopstd::vector<LoopThread *> _threads; // 保存所有的LoopThread線程對象std::vector<EventLoop *> _loops; // 保存所有線程對應的eventloop,然后依次分配出去。
2.2構造函數
線程的數量在初始化時不設置,由用戶調用接口設置。
外界需要將主線程的baseloop對象傳入進來,以防止線程池中沒有線程。
LoopThreadPool(EventLoop *baseloop) : _thread_count(0), _loop_idx(0), _baseloop(baseloop) {}
2.3配置線程數量
void SetThreadCount(int count){_thread_count = count;}
2.4按照配置數量創建線程
創建線程時,將所有線程以及線程對應的eventloop對象都存儲在數組中。
然后根據下標就可以依次分配線程對應的eventloop對象了。
void CreateLoopThread(){if (_thread_count > 0) // 如果大于0就將所有的線程都創建出來,如果小于0,那么就必須要用主線程的baseloop。{_threads.resize(_thread_count);_loops.resize(_thread_count);for (int i = 0; i < _thread_count; i++){// 創建線程,并將所有的線程管理起來_threads[i] = new LoopThread();// 并保存管理所有線程對應的loop_loops[i] = _threads[i]->GetLoop();}}}
2.5依次分配Eventloop對象
如果線程池中線程的個數為0,這時候分配出去的eventloop對象就是主線程的baseloop對象。主線程eventloop既獲取連接,也進行連接監控與處理。
否則就按照輪轉形式依次分配出去。
EventLoop *AssignLoop(){// 如果從線程的數量為0,那么分配出去的loop就是主線程對應的loopif (_thread_count == 0){return _baseloop;}_loop_idx = (_loop_idx + 1) % _thread_count;return _loops[_loop_idx];}