事件循環線程池的理解
- 前置知識
- reactor模型
- thread::start()方法的理解
- 創建線程池
- 子線程被喚醒的幾種情況
- 子線程被主線程喚醒
- 新連接到來
- 有消息需要發送時(多reactor情況時)
- 關閉連接時
- 子線程被喚醒執行任務
在 上一篇中,我們討論了關于簡單的線程池的實現,此次我們就基于簡單的線程池實現,深入剖析muduo網絡庫的事件循環線程池的代碼:
前置知識
reactor模型
thread::start()方法的理解
#include "Thread.h"
#include "CurrentThread.h"#include <semaphore.h>std::atomic_int Thread::numCreated_(0);Thread::Thread(ThreadFunc func, const std::string &name): started_(false), joined_(false), tid_(0), func_(std::move(func)), name_(name)
{setDefaultName();
}Thread::~Thread()
{if (started_ && !joined_){ // thread類提供了設置分離線程的方法 線程運行后后臺自動銷毀(非阻塞)thread_->detach(); }
}void Thread::start() // 一個Thread對象 記錄的就是一個新線程的詳細信息
{started_ = true;//定義一個信號量sem_t sem;//設置信號量sem,是用于線程間,初值為0sem_init(&sem, false, 0); // false指的是 不設置進程間共享// 開啟線程thread_ = std::shared_ptr<std::thread>(new std::thread([&]() {tid_ = CurrentThread::tid(); // 獲取線程的tid值sem_post(&sem);//sem值+1func_(); // 開啟一個新線程 專門執行該線程函數}));// 這里必須等待獲取上面新創建的線程的tid值sem_wait(&sem);
}void Thread::join()
{joined_ = true;thread_->join();//阻塞等待當前線程執行完畢
}void Thread::setDefaultName()
{int num = ++numCreated_;if (name_.empty()){char buf[32] = {0};snprintf(buf, sizeof buf, "Thread%d", num);name_ = buf;}
}
這個類是對std::thread進行了一個封裝,主要關注Thread::start
方法,核心實現是定義了一個信號量sem,以確保子線程能夠成功被創建,具體的創建流程如下圖。
創建線程池
- TcpServer啟動線程池
- 在線程池中根據傳入的線程數量創建事件循環線程EventLoopThread,并且將創建的事件循環線程加到線程池threads中去,并且調用EventLoopThread->loop函數,隨后綁定返回的已經創建好事件循環EventLoop
- 創建事件循環EventLoopThread的時候綁定函數threadFunc
- 調用thread_.start()函數,thread_創建了一個子線程,該子線程執行綁定的threadFunc函數。在threadFunc函數中,創建事件循環EventLoop,創建成功了通知主線程,并在開始事件循環EventLoop.loop();與此同時,主線程在循環等待子線程成功創建EventLoop,并接收創建好的EventLoop對象,并返回。
上述過程可以總結為:主線程隨著函數調用開辟子線程,子線程成功創建事件循環,并在在子線程中事件事件循環,實現了one loop per thread,主線程返回到創建事件循環線程池中,繼續執行其他任務。如下圖所示。
關于主線程的生命周期的理解:
EventLoop *EventLoopThread::startLoop()
{thread_.start(); // 啟用底層線程Thread類對象thread_中通過start()創建的線程EventLoop *loop = nullptr;{std::unique_lock<std::mutex> lock(mutex_);cond_.wait(lock, [this](){return loop_ != nullptr;});loop = loop_;}return loop;
}
這里是存在開辟了一個新的線程,可以稱為子線程,子線程中是執行eventloop.loop()事件循環的,在程序的運行過程中不會被回收;但是這個主線程的,隨著return loop
;的返回,主線程是不是會被釋放呢?其實不是的,這里的主線程可以看成是負責整個項目的實現的核心線程,當主線程執行startLoop()函數后創建了子線程并返回,主線程就繼續執行其他的函數,并不是說返回了就被銷毀了。
比如說主線程執行完startLoop()的返回順序是:EventLoopThread::startLoop()—>>> EventLoopThreadPool::start—>>>然后執行TcpServer::start()中的loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get())),跟調用順序是相反的。見上圖
感悟
:關于這一點理解起來還是花了挺久的,這個項目前前后后也是花了挺多時間看了,但是總是存在這里那里的小問題,不影響理解整體項目,但是這些小點是真的才能感受到項目的高明;也是花時間整理這些小知識點,才可以有更深的感悟。
子線程被喚醒的幾種情況
首先,我們需要強調一下,這里存在mainloop(一個)和subloop(多個),mainloop所在的線程為主線程,subloop所在的線程為子線程(可以理解為,subloop1對應為子線程1,subloop2對應為子線程2等)
子線程被主線程喚醒
新連接到來
當一切都初始化好后,MainLoop的Acceptor開始監聽,當有新的連接到來時,觸發新連接到來的回調函數,在回調函數中處理新連接的相關操作。
- 調用getNextLoop()函數
- 執行輪詢算法返回一個事件循環EventLoop
- 對得到的事件循環loop綁定TCP連接,并且設置回調函數(通過TcpConnection設置,但最終是設置到EventLoop中的Channel回調函數中),見下圖(圖片來源萬字長文梳理Muduo庫核心代碼及優秀編程細節思想剖析,更多可以參考這篇文章)
- 調用runInLoop,在對應的EventLoop中執行相應的回調函數。但是這里存在一個問題,即我是通過MainLoop上所在的線程選擇了一個子Loop,假如說為subLoop1,并且需要執行在subLoop1上綁定的函數,而當前的線程是MainLoop對應的線程。如果之間運行,這就不滿足每一個線程運行一個EventLoop的條件了(one loop per thread)。所以在這里,就需要判斷一下,是否是在當前loop中對應的線程執行的任務。如果不是,需要通過wakeup函數喚醒當前loop對應的線程。
- 然后loop對應的線程就被喚醒起來工作(相當于消費者來消費任務了),執行綁定的connectEstablished()函數,在connectEstablished()函數實現tie()函數綁定,并且設置監聽讀事件。
建立Tcp連接后,以后發生在這個連接上的所有事件都交由這個SubLoop1來負責了。
有消息需要發送時(多reactor情況時)
在這里插入代碼片
關閉連接時
當連接斷開或者關閉連接,執行TcpServer::removeConnection()
回調函數。
- 上層調用
TcpServer::removeConnection
函數,調用runInLoop
函數,并綁定執行TcpServer::removeConnectionInLoop
- 因為
removeConnectionInLoop
是執行在mainloop對應的線程(執行TcpServer類下的函數都在mainloop中對應的線程,也可以說在主線程中運行),所以這里之間是在當前的mainloop中執行回調,執行removeConnectionInLoop
函數 - 首先移除TCP連接,并且得到將要移除的連接對應的ioloop;因為當前是處于mainloop對應的主線程,所以ioloop執行調用
queueInLoop
- 在
queueInLoop
函數中,核心是異步喚醒自己對應的線程,并執行等待執行的任務(在這里是connectDestroyed) - 在
connectDestroyed
中,將待移除的TCP連接中的channel中的所有感興趣的事件從poller中移除掉,并將channel從poller中移除。
子線程被喚醒執行任務
我們知道,在線程池初始化過程中,每一個線程就對應一個事件循環(eventloop)已開始循環,并且當新連接到來時,mainloop按照輪詢方法將新連接分發給一個事件循環,以后這個事件循環就負責這個新連接的所有操作了(包括接收消息、發送消息等)。
void EventLoop::loop()
{looping_ = true;quit_ = false;LOG_INFO("EventLoop %p start looping\n", this);while (!quit_){activeChannels_.clear();pollRetureTime_ = poller_->poll(kPollTimeMs, &activeChannels_);for (Channel *channel : activeChannels_){// Poller監聽哪些channel發生了事件 然后上報給EventLoop 通知channel處理相應的事件channel->handleEvent(pollRetureTime_);}doPendingFunctors();//執行等待處理的函數,一般是主線程添加的。}LOG_INFO("EventLoop %p stop looping.\n", this);looping_ = false;
}
可以看到,初始化的事件循環eventloop在自己的線程內循環等待任務的來臨。
pollRetureTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
這句話是監聽對于每一個連接注冊的事件是否發生了(如果發生了,注冊到activeChannels_并依序執行);否則超時返回。