文章目錄
- 設計思路
- 三種協程的切換
協程調度模塊,需要把前面的線程模塊和協程模塊結合使用 ~
設計思路
- 構造函數定義 線程池 基本信息。
- start(),創建線程池,每個線程創建都執行 run()。
- 每個線程在 run() 里,查找任務隊列 m_tasks。如果獲取到任務后,創建協程并切換執行 ~ 如果沒任務切換到 idel 協程等待 ~
- 添加任務到 m_tasks。
- stop(),調用tickle(),喚醒所有線程,等待所有的任務完成。
主要的函數:
- 構造函數
- Scheduler(size_t threads, bool use_caller, const std::string &name) // 模板函數,添加任務
- start()
- run()
- stop()
主要的變量:
- 線程變量:
- static thread_local Scheduler* t_scheduler;當前線程的調度器,同一個調度器下的所有線程貢獻同一個實例。
- static thread_local Fiber* t_scheduler_fiber; 當前線程的調度協程,每個線程都獨一份。
- Scheduler類變量
- std::vector<Thread::ptr> m_threads; 線程池
- std::list<ScheduleTask> m_tasks; 任務隊列
- bool m_useCaller; 主線程是否添加調度
- 當 m_useCaller = true; 主線程添加調度
- Fiber::ptr m_rootFiber; 調度器所在線程的調度協程
- int m_rootThread; 調度所在的線程id
具體調度需要細分情況:
-
主線程不添加到調度器
這種較為簡單。- Scheduler(),定義線程池變量
- start(),創建子線程 執行 run()
- run(),如果是子線程,需要創建主協程賦值給 t_scheduler_fiber 作為調度協程。idle_fiber協程。cb_fiber任務協程。從任務隊列拿去任務,然后設置cb_fiber,切換執行。(主協程 <----> cb_fiber)。如果沒有任務,切換idel協程,阻塞(iomanager里會使用epoll_wait重寫這個方法,這里還只是象征性的 等待。重寫需要注意,idle_fiber是在while循環里,也就是只要不stop,idle_fiber會一直存在。)(主協程 <----> idle_fiber)。
- stop(),設置m_stopping,喚起子線程,等待任務執行結束。【純線程池 模型下,只要是外部線程即可stop】
-
主線程添加到調度器
其實這里,最重要的是 三協程的切換設置。
(主協程 — 調度協程 — 任務協程)- Scheduler(),定義線程池變量。創建調度協程賦值 m_rootFiber 作為當前主線程的調度協程,運行run()。賦值t_scheduler_fiber = m_rootFiber.get() ,這就是當前主線程的調度協程。賦值 m_rootThread 當前主線程(用于判斷是否是主線程)。
- start(),同上,創建子線程,執行run()
- run(),此時額外增加 主線程的 調度過程。如果是主線程,那么 t_scheduler_fiber 已經賦值為調度協程。直接拿去任務執行,或者切換idle等待。
- stop(),特殊性在于,主線程一直是主協程在 初始化/添加任務。只有在stop里,切換到 m_rootFiber 調度協程消費任務。【use_caller 模式下執行stop(),必須是主線程,因為我們需要 切換到 主線程里的調度協程 消費一下任務】
三種協程的切換
對于 主協程,調度協程,任務協程。
重構了 協程模塊 里的 yield 和 resume
yield:任務協程 --> 調度協程 —> 主協程
resume: 主協程 —> 調度協程 —> 任務協程
Fiber增加一個類變量
bool m_runInScheduler; // 本協程是否參與調度器調度,相當于當前協程是否是任務協程。
void Fiber::yield(){SYLAR_ASSERT(m_state == TERM || m_state == RUNNING) // 當前子協程可以是 TERM,RUNNINGif(m_state != TERM){ // 如果沒有結束,中途進行yield,狀態設置為READY,可能還會回來繼續執行。m_state = READY;}if(m_runInScheduler){if(swapcontext(&m_ctx, &(Scheduler::GetMainFiber()->m_ctx))){...}}else{if(swapcontext(&m_ctx, &(t_thread_fiber->m_ctx))){...}}
}void Fiber::resume(){SYLAR_ASSERT(m_state == READY);// 切換前,提前設置狀態和 當前線程運行的協程。SetThis(this);m_state = RUNNING;if(m_runInScheduler){ // 相當于當前協程,是任務協程。 t_scheduler_fiber --> t_fiberif(swapcontext(&(Scheduler::GetMainFiber()->m_ctx), &m_ctx)){...}}else { // t_thread_fiber --> t_scheduler_fiberif(swapcontext(&(t_thread_fiber->m_ctx), &m_ctx)){...}}
}