文章目錄
- P30-P32:協程調度01-03
- 一、Scheduler
- 局部變量
- FiberAndThread(任務結構體)
- 成員變量
- 調度協程
- 構造函數
- 析構函數
- start
- stop
- run
- stopping
- 二、參考資料
- P33-P35:協程調度04-06
- 一、測試1
- 二、測試2
- 總結
P30-P32:協程調度01-03
? 這里開始協程調度模塊,封裝了一個M:N協程調度器,創建N個協程在M個線程上運行,調度器的主要思想就是先查看任務隊列中有沒有任務需要執行,若沒有任務就進入空閑狀態,反之進行調度。
? 前面3小節搭建了調度器的基礎結構,下面我按照自己的理解以及他人的筆記內容對整個類進行解釋。
一、Scheduler
局部變量
兩個局部線程變量保存當前線程的協程調度器和主協程
// 協程調度器
static thread_local Scheduler* t_scheduler = nullptr;
// 線程主協程
static thread_local Fiber* t_fiber = nullptr;
FiberAndThread(任務結構體)
該結構體的作用是存儲協程、回調函數以及線程的信息
// 協程/函數/線程組struct FiberAndThread {Fiber::ptr fiber; // 協程std::function<void()> cb; // 回調函數int thread; // 線程IDFiberAndThread(Fiber::ptr f, int thr): fiber(f), thread(thr) {}FiberAndThread(Fiber::ptr* f, int thr):thread(thr) {// 因為傳入的是一個智能指針,我們使用后會造成引用數加一,可能會引發釋放問題,這里swap相當于把傳入的智能指針變成一個空指針// 這樣原智能指針的計數就會保持不變fiber.swap(*f); }FiberAndThread(std::function<void()> f, int thr): cb(f), thread(thr) {} FiberAndThread(std::function<void()>* f, int thr):thread(thr) {cb.swap(*f);} // 將一個類用到STL中必須要有默認構造函數,否則無法進行初始化FiberAndThread(): thread(-1) {}// 重置void reset() {fiber = nullptr;cb = nullptr;thread = -1;}};
成員變量
private:MutexType m_mutex; // 互斥量std::vector<Thread::ptr> m_threads; // 線程池std::list<FiberAndThread> m_fibers; // 等待執行的協程隊列Fiber::ptr m_rootFiber; // 主協程std::string m_name; // 協程調度器名稱protected:std::vector<int> m_threadIds; // 協程下的線程id數組size_t m_threadCount = 0; // 線程數量std::atomic<size_t> m_activeThreadCount = {0}; // 工作線程數量std::atomic<size_t> m_idleThreadCount = {0}; // 空閑線程數量bool m_stopping = true; // 是否正在停止bool m_autoStop = false; // m_autoStopint m_rootThread = 0; // 主線程id(use_caller)
調度協程
檢查任務隊列中有無任務,將任務加入到任務隊列中,若任務隊列中本來就已經有任務了,就tickle
進行通知
// 調度協程模板函數
template<class FiberOrCb>void schedule(FiberOrCb fc, int thread = -1) { // -1表示任意線程bool need_tickle = false;{MutexType::Lock lock(m_mutex);need_tickle = scheduleNoLock(fc, thread);}if(need_tickle) {tickle();}
}// 批量處理調度協程
template<class InputIterator>void schedule(InputIterator begin, InputIterator end, int thread = -1) {bool need_tickle = false;{MutexType::Lock lock(m_mutex);while(begin != end) {need_tickle = scheduleNoLock(&*begin, thread) || need_tickle;begin ++;}}if(need_tickle) {tickle();}
}// 協程調度啟動(無鎖)
template<class FiberOrCb>bool scheduleNoLock(FiberOrCb fc, int thread) {bool need_tickle = m_fibers.empty();FiberAndThread ft(fc, thread);if(ft.fiber || ft.cb) {m_fibers.push_back(ft);}return need_tickle;
}
構造函數
Scheduler::Scheduler(size_t threads, bool use_caller, const std::string& name): m_name(name) {// 確定線程數量大于0 SYLAR_ASSERT(threads > 0);// 是否將當前用于協程調度線程也納入調度器if(use_caller) {sylar::Fiber::GetThis(); // 這里獲得的主協程用于調度其余協程-- threads; // 線程數-1SYLAR_ASSERT(GetThis() == nullptr); // 防止出現多個調度器// 設置當前的協程調度器t_scheduler = this;// 將此fiber設置為 use_caller,協程則會與 Fiber::MainFunc() 綁定// 非靜態成員函數需要傳遞this指針作為第一個參數,用 std::bind()進行綁定m_rootFiber.reset(new Fiber(std::bind(&Scheduler::run, this))); // 這個新協程用于執行方法// 設置線程名稱sylar::Thread::SetName(m_name);// 設置當前線程的主協程為m_rootFiber// 這里的m_rootFiber是該線程的主協程(執行run任務的協程),只有默認構造出來的fiber才是主協程t_fiber = m_rootFiber.get();// 獲得當前線程idm_rootThread = sylar::GetTreadId();m_threadIds.push_back(m_rootThread);} else { // 不將當前線程納入調度器m_rootThread = -1;}// 更新線程數量m_threadCount = threads;
}
析構函數
Scheduler::~Scheduler() {// 達到停止條件SYLAR_ASSERT(m_stopping);if(GetThis() == this) {t_scheduler = nullptr;}
}
start
void Scheduler::start() {SYLAR_LOG_INFO(g_logger) << "start()";MutexType::Lock lock(m_mutex);// 為false代表已經啟動了,直接返回if(!m_stopping) {return;}// 將停止狀態更新為falsem_stopping = false;// 線程池為空SYLAR_ASSERT(m_threads.empty());// 創建線程池m_threads.resize(m_threadCount);for(size_t i = 0; i < m_threadCount; ++ i) {// 遍歷每一個線程執行run任務m_threads[i].reset(new Thread(std::bind(&Scheduler::run, this), m_name + " " + std::to_string(i)));m_threadIds.push_back(m_threads[i]->getId());}lock.unlock();// 在這里切換線程時,swap的話會將線程的主協程與當前協程交換,// 當使用use_caller時,t_fiber = m_rootFiber,call是將當前協程與主協程交換// 為了確保在啟動之后仍有任務加入任務隊列中,// 所以在stop()中做該線程的啟動,這樣就不會漏掉任務隊列中的任務if(m_rootFiber) {// m_rootFiber->swapIn();m_rootFiber->call();SYLAR_LOG_INFO(g_logger) << "call out" << m_rootFiber->getState();}SYLAR_LOG_INFO(g_logger) << "start() end";
}
stop
void Scheduler::stop() {SYLAR_LOG_INFO(g_logger) << "stop()";m_autoStop = true;// 使用use_caller,并且只有一個線程,并且主協程的狀態為結束或者初始化if(m_rootFiber && m_threadCount == 0 && (m_rootFiber->getState() == Fiber::TERM || m_rootFiber->getState() == Fiber::INIT)) {SYLAR_LOG_INFO(g_logger) << this << " stopped";// 停止狀態為truem_stopping = true;// 若達到停止條件則直接returnif(stopping()) {return;}}// bool exit_on_this_fiber = false;// use_caller線程// 當前調度器和t_secheduler相同if(m_rootThread != -1) {SYLAR_ASSERT(GetThis() == this);} else {SYLAR_ASSERT(GetThis() != this);}m_stopping = true;// 每個線程都tickle一下for(size_t i = 0; i < m_threadCount; ++ i) {tickle();}// 使用use_caller多tickle一下if(m_rootFiber) {tickle();}if(stopping()) {return;}
}
run
void Scheduler::run() {SYLAR_LOG_INFO(g_logger) << "run()";// 設置當前調度器setThis();// 非user_caller線程,設置主協程為線程主協程if(sylar::GetTreadId() != m_rootThread) {t_fiber = Fiber::GetThis().get();}// 定義idle_fiber,當任務隊列中的任務執行完之后,執行idle()Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle, this)));// 定義回調協程Fiber::ptr cb_fiber;// 定義一個任務結構體FiberAndThread ft;while(true) {// 重置也是一個初始化ft.reset();bool tickle_me = false;{// 從任務隊列中拿fiber和cbMutexType::Lock lock(m_mutex);auto it = m_fibers.begin();while(it != m_fibers.end()) {// 如果當前任務指定的線程不是當前線程,則跳過,并且tickle一下if(it->thread != -1 && it->thread != sylar::GetTreadId()) {++ it;tickle_me = true;continue;}// 確保fiber或cb存在SYLAR_ASSERT(it->fiber || it->cb);// 如果該fiber正在執行則跳過if(it->fiber && it->fiber->getState() == Fiber::EXEC) {++ it;continue;}// 取出該任務ft = *it;// 從任務隊列中清除m_fibers.erase(it);}}// 取到任務tickle一下if(tickle_me) {tickle();}// 執行拿到的線程if(ft.fiber && (ft.fiber->getState() != Fiber::TERM || ft.fiber->getState() != Fiber::EXCEPT)) {++ m_activeThreadCount;// 執行任務ft.fiber->swapIn();// 執行完成,活躍的線程數量減-1-- m_activeThreadCount;ft.fiber->swapIn();// 如果線程的狀態被置為了READYif(ft.fiber->getState() == Fiber::READY) {// 將fiber重新加入到任務隊列中schedule(ft.fiber);} else if(ft.fiber->getState() != Fiber::TERM && ft.fiber->getState() != Fiber::EXCEPT) {ft.fiber->m_state = Fiber::HOLD;}// 執行完畢重置數據ftft.reset();// 如果任務是回調函數} else if(ft.cb) {// cb_fiber存在,重置該fiberif(cb_fiber) {cb_fiber->reset(ft.cb);} else {// cb_fiber不存在則初始化一個cb_fiber.reset(new Fiber(ft.cb));ft.cb = nullptr;}// 重置數據ftft.reset();++ m_activeThreadCount;// 執行cb任務cb_fiber->swapIn();-- m_activeThreadCount;// 若cb_fiber狀態為READYif(cb_fiber->getState() == Fiber::READY) {// 重新放入任務隊列中schedule(cb_fiber);// 釋放智能指針cb_fiber.reset();// cb_fiber異常或結束,就重置狀態,可以再次使用該cb_fiber} else if(cb_fiber->getState() == Fiber::EXCEPT || cb_fiber->getState() == Fiber::TERM) {// cb_fiber的執行任務置空cb_fiber->reset(nullptr);} else {// 設置狀態為HOLD,此任務后面還會通過ft.fiber被拉起cb_fiber->m_state = Fiber::HOLD;// 釋放該智能指針,調用下一個任務時要重新new一個新的cb_fibercb_fiber.reset();}// 沒有任務執行} else {// 如果idle_fiber的狀態為TERM則結束循環,真正的結束if(idle_fiber->getState() == Fiber::TERM) {SYLAR_LOG_INFO(g_logger) << "idle fiber term";break;}// 正在執行idle的線程數量+1++ m_idleThreadCount;// 執行idle()// 正在執行idle的線程數量-1idle_fiber->swapIn();-- m_idleThreadCount;// idle_fiber狀態置為HOLDif(idle_fiber->getState() != Fiber::TERM&& idle_fiber->getState() != Fiber::EXCEPT) {idle_fiber->m_state = Fiber::HOLD;}}}
}
stopping
bool Scheduler::stopping() {MutexType::Lock lock(m_mutex);// 當自動停止 && 正在停止 && 任務隊列為空 && 活躍的線程數量為0return m_autoStop && m_stopping && m_fibers.empty() && m_activeThreadCount == 0;
}
二、參考資料
- 源碼
- 筆記
P33-P35:協程調度04-06
? 這幾節主要對協程調度的代碼進行了調試,針對幾個小bug視頻花了不少時間去解決,整個問題解決步驟我這里就不記錄了,下面通過一個具體的例子來演示協程調度器的作用。
一、測試1
#include "../sylar/sylar.h"sylar::Logger::ptr g_logger = SYLAR_LOG_ROOT();void test_fiber() {static int s_count = 5;SYLAR_LOG_INFO(g_logger) << "test in fiber s_count = " << s_count;sleep(1);if(-- s_count >= 0) {// 未指定線程ID,表示任意線程都能執行任務sylar::Scheduler::GetThis()->schedule(&test_fiber);}}int main(int argc, char** argv) {SYLAR_LOG_INFO(g_logger) << "main";sylar::Scheduler sc(3,false, "test");sleep(2);sc.start();sc.schedule(&test_fiber);sc.stop();SYLAR_LOG_INFO(g_logger) << "over";return 0;
}
? 上面這段測試定義了3個線程,并且將use_caller設為false,表示不讓用于調度的線程執行任務。此外在執行任務時,由于沒有指定線程ID,說明任意線程都能執行任務。
結果
可以看到3個協程不停的切換執行了任務
二、測試2
下面測試修改了兩處代碼,首先在執行任務時指定第一個執行任務的線程去執行所有的任務,其次將use_caller設置為true,表示用于調度的線程也能參與執行任務,那么就可以少開一個線程,提高效率。
sylar::Scheduler::GetThis()->schedule(&test_fiber, sylar::GetTreadId());sylar::Scheduler sc(3,true, "test");
結果
可以看到所有的任務都是由下標為1的線程去執行,并且線程池中的線程一共就只有3個。
總結
? 這6節視頻全部看完后并且調試通代碼才大概在功能層面了解sylar做的這個協程調度器,一開始聽的時候確實很迷茫,一直在不停改代碼但不知道為什么要去改。不過最后也還沒有完全搞懂協程調度器的細節,現在這個階段能理解代碼的運行邏輯就行了,后面二刷的時候再去深究,總之后面的內容越來越復雜了,對于我這種之前沒有服務器基礎的理解起來相當困難。