TaskControl
- 1 Definition
- 2 Introduce
- **核心職責**
- 3 成員解析
- **3.1 數據結構與線程管理**
- **3.2 任務調度與負載均衡**
- **3.3 線程停放與喚醒(ParkingLot)**
- **3.4 統計與監控**
- 4 **工作流程**
- 5 **設計亮點**
- 6 **使用場景示例**
- 7 **總結**
- 8 學習過程中的疑問
- 8.1 init函數為什么不在構造函數中調用
1 Definition
class TaskControl {friend class TaskGroup; // 友元類friend void wait_for_butex(void*); // 友元函數
#ifdef BRPC_BTHREAD_TRACERfriend bthread_t init_for_pthread_stack_trace(); // 友元函數
#endif // BRPC_BTHREAD_TRACERpublic:TaskControl();~TaskControl();// Must be called before using. `nconcurrency' is # of worker pthreads.int init(int nconcurrency);// Create a TaskGroup in this control.TaskGroup* create_group(bthread_tag_t tag);// Steal a task from a "random" group.bool steal_task(bthread_t* tid, size_t* seed, size_t offset);// Tell other groups that `n' tasks was just added to caller's runqueuevoid signal_task(int num_task, bthread_tag_t tag);// Stop and join worker threads in TaskControl.void stop_and_join();// Get # of worker threads.int concurrency() const { return _concurrency.load(butil::memory_order_acquire); }int concurrency(bthread_tag_t tag) const { return _tagged_ngroup[tag].load(butil::memory_order_acquire); }void print_rq_sizes(std::ostream& os);double get_cumulated_worker_time();double get_cumulated_worker_time_with_tag(bthread_tag_t tag);int64_t get_cumulated_switch_count();int64_t get_cumulated_signal_count();// [Not thread safe] Add more worker threads.// Return the number of workers actually added, which may be less than |num|int add_workers(int num, bthread_tag_t tag);// Choose one TaskGroup (randomly right now).// If this method is called after init(), it never returns NULL.TaskGroup* choose_one_group(bthread_tag_t tag);#ifdef BRPC_BTHREAD_TRACER// A stacktrace of bthread can be helpful in debugging.void stack_trace(std::ostream& os, bthread_t tid);std::string stack_trace(bthread_t tid);
#endif // BRPC_BTHREAD_TRACERprivate:typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;static const int PARKING_LOT_NUM = 4;typedef std::array<ParkingLot, PARKING_LOT_NUM> TaggedParkingLot;// Add/Remove a TaskGroup.// Returns 0 on success, -1 otherwise.int _add_group(TaskGroup*, bthread_tag_t tag);int _destroy_group(TaskGroup*);// Tag groupTaggedGroups& tag_group(bthread_tag_t tag) { return _tagged_groups[tag]; }// Tag ngroupbutil::atomic<size_t>& tag_ngroup(int tag) { return _tagged_ngroup[tag]; }// Tag parking slotTaggedParkingLot& tag_pl(bthread_tag_t tag) { return _pl[tag]; }static void delete_task_group(void* arg);static void* worker_thread(void* task_control);template <typename F>void for_each_task_group(F const& f);bvar::LatencyRecorder& exposed_pending_time();bvar::LatencyRecorder* create_exposed_pending_time();bvar::Adder<int64_t>& tag_nworkers(bthread_tag_t tag);bvar::Adder<int64_t>& tag_nbthreads(bthread_tag_t tag);std::vector<butil::atomic<size_t>> _tagged_ngroup;std::vector<TaggedGroups> _tagged_groups;butil::Mutex _modify_group_mutex;butil::atomic<bool> _init; // if not init, bvar will case coredumpbool _stop;butil::atomic<int> _concurrency;std::vector<pthread_t> _workers;butil::atomic<int> _next_worker_id;bvar::Adder<int64_t> _nworkers;butil::Mutex _pending_time_mutex;butil::atomic<bvar::LatencyRecorder*> _pending_time;bvar::PassiveStatus<double> _cumulated_worker_time;bvar::PerSecond<bvar::PassiveStatus<double> > _worker_usage_second;bvar::PassiveStatus<int64_t> _cumulated_switch_count;bvar::PerSecond<bvar::PassiveStatus<int64_t> > _switch_per_second;bvar::PassiveStatus<int64_t> _cumulated_signal_count;bvar::PerSecond<bvar::PassiveStatus<int64_t> > _signal_per_second;bvar::PassiveStatus<std::string> _status;bvar::Adder<int64_t> _nbthreads;std::vector<bvar::Adder<int64_t>*> _tagged_nworkers;std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time;std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*> _tagged_worker_usage_second;std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;std::vector<TaggedParkingLot> _pl;#ifdef BRPC_BTHREAD_TRACERTaskTracer _task_tracer;
#endif // BRPC_BTHREAD_TRACER};
2 Introduce
TaskControl作為任務調度控制中心,管理多個任務組(TaskGroup
)并協調工作線程的高效運作,適用于BRPC的bthread協程庫:
核心職責
- 任務組管理:創建、銷毀任務組,支持按標簽(
bthread_tag_t
)分類管理。 - 線程池調度:動態調整工作線程數量,實現任務竊取(Work Stealing)以平衡負載。
- 同步與喚醒:通過停放區(
ParkingLot
)管理線程的休眠與喚醒。 - 性能監控:集成統計模塊(
bvar
)跟蹤任務處理時間、切換次數等指標。
3 成員解析
3.1 數據結構與線程管理
-
_tagged_groups
類型:std::vector<TaggedGroups>
作用:按標簽存儲任務組指針數組,每個標簽對應一個TaggedGroups
(固定大小為BTHREAD_MAX_CONCURRENCY
的數組)。
示例:標簽可用于區分不同業務優先級或租戶的任務。 -
_tagged_ngroup
類型:std::vector<butil::atomic<size_t>>
作用:記錄每個標簽下的任務組數量,原子操作保證線程安全。 -
_workers
類型:std::vector<pthread_t>
作用:保存所有工作線程的ID,用于線程生命周期管理(啟動、停止、回收)。 -
_concurrency
類型:butil::atomic<int>
作用:總工作線程數,支持原子讀寫,動態調整并發度。
3.2 任務調度與負載均衡
-
steal_task(bthread_t* tid, size_t* seed, size_t offset)
作用:從其他任務組竊取任務,避免工作線程空閑。
實現:- 使用
seed
隨機選擇目標組,結合offset
避免多個線程競爭同一隊列。 - 竊取成功返回
true
,任務ID存入tid
。
- 使用
-
signal_task(int num_task, bthread_tag_t tag)
作用:通知任務組有新任務加入,觸發喚醒機制。
場景:當任務被添加到隊列時,調用此方法喚醒可能休眠的線程。 -
choose_one_group(bthread_tag_t tag)
作用:根據標簽選擇一個任務組,用于任務分發或負載均衡。
策略:可能采用輪詢或隨機算法選擇組,確保任務均勻分配。
3.3 線程停放與喚醒(ParkingLot)
_pl
類型:std::vector<TaggedParkingLot>
作用:按標簽管理的停放區數組,每個標簽對應PARKING_LOT_NUM
個停放區。
機制:- 工作線程無任務時進入停放區等待,減少CPU空轉。
- 新任務到達時,通過停放區喚醒線程,降低延遲。
3.4 統計與監控
-
bvar
集成
關鍵指標:_cumulated_worker_time
:累計任務處理時間。_cumulated_switch_count
:上下文切換次數。_signal_per_second
:每秒任務喚醒次數。
作用:通過BRPC的bvar
庫暴露性能指標,方便實時監控與調優。
-
標簽化統計
成員如_tagged_nworkers
、_tagged_cumulated_worker_time
等,按標簽細分統計,支持多維分析。
4 工作流程
-
初始化
- 調用
init(nconcurrency)
創建指定數量工作線程,每個線程執行worker_thread
函數。 - 工作線程通過
choose_one_group
選擇任務組,執行任務循環。
- 調用
-
任務執行
- 線程從本地任務組獲取任務,若隊列為空,嘗試從其他組竊取(
steal_task
)。 - 無任務可執行時,進入停放區(
ParkingLot
)休眠。
- 線程從本地任務組獲取任務,若隊列為空,嘗試從其他組竊取(
-
任務通知
- 添加新任務時,調用
signal_task
遞增信號計數器,喚醒停放區線程。
- 添加新任務時,調用
-
動態擴縮容
add_workers
動態增加指定標簽的工作線程,適應負載變化。
-
停止與清理
stop_and_join
設置_stop
標志,通知所有線程退出,并回收資源。
5 設計亮點
- 標簽化分組
支持多維度任務分類,適用于混合部署場景(如不同服務優先級)。 - 任務竊取
避免工作線程閑置,提升CPU利用率,降低任務處理延遲。 - 高效同步
結合原子操作與停放區,減少鎖競爭,保證高吞吐量。 - 精細化監控
通過bvar
提供詳盡的運行時指標,助力性能分析與優化。
6 使用場景示例
高并發服務:
- Web服務器接收請求后,封裝為bthread任務,按業務類型打標簽。
TaskControl
根據標簽分發任務到不同組,保證關鍵業務優先調度。- 工作線程動態擴展應對流量高峰,空閑時自動縮減節省資源。
- 監控指標實時反饋系統負載,輔助容量規劃。
7 總結
TaskControl
是BRPC bthread調度系統的核心,通過高效的任務組管理、工作線程調度及細粒度監控,實現了高并發、低延遲的協程任務處理。其設計充分考慮了擴展性、性能與可觀測性,是構建高性能C++服務的基石組件。
8 學習過程中的疑問
8.1 init函數為什么不在構造函數中調用
疑問:init函數在注釋中聲明需要在使用前調用,為什么不能將其放在構造函數中直接調用呢?
回答:
-
可能原因:
-
- 初始化可能失敗,需要錯誤處理
- 構造函數沒有返回值,若在構造函數中執行可能的失敗操作(eg. 創建線程、分配資源),只能通過異常或設置內部狀態標記錯誤。brpc代碼風格傾向于避免異常,習慣于返回錯誤碼。參考代碼8-1
-
- 需要依賴外部參數
- 構造時參數可能不完整,TaskControl的初始化需要并發線程數等參數,可能在運行時動態確定,無法在編譯器硬編碼。
- 更復雜的重載,如果后續需要擴展初始化參數(eg. 增加timeout/policy等配置),顯式init()更容易擴展,而構造函數重載會膨脹。
- 3 支持對象的復用
- 銷毀后重新初始化,某些場景下,用戶可能希望銷毀TaskControl后重新初始化(eg. 動態調整線程池的大小)。若初始化邏輯在構造函數中,則需要先析構對象再重新構造,而顯式init/stop_and_join允許復用同一對象。參考代碼8-2
- 4 明確的二段式生命周期
- 分離資源分配與初始化,二段式設計(構造+init())將對喜愛嗯的內存分配和資源初始化解耦:
- 構造階段:僅進行內存布局、簡單成員初始化;
- 初始化階段:執行重量級操作(eg. 創建線程、連接資源);
- 更符合RAII的變體模式,尤其是在需要延遲初始化時。
- 分離資源分配與初始化,二段式設計(構造+init())將對喜愛嗯的內存分配和資源初始化解耦:
- 5 避免隱藏的副作用
- 隱式初始化可能引入意外行為,若構造函數自動初始化,用戶可能在不知情的情況下觸發資源分配(eg. 線程創建)。顯式init()強制用戶主動控制初始化時機,避免副作用。
- 6 與brpc其他組件的設計一致性
- 統一風格,brpc很多組件(eg. Channel / Server)均采用類似的二段式模式(先構造,再調用init() / start()),保持代碼風格統一,降低用戶學習成本。
-
-
什么情況下應在構造函數中初始化?
- 輕量級且無失敗可能的操作,eg. 設置默認參數、初始化原子計數器等。
- 強制一次性初始化,若對象必須在構造時完全初始化,且不允許重新初始化。
-
兩種方式對比,見代碼8-3
-
二段式的作用
- 清晰的錯誤處理:通過返回int明確傳遞錯誤
- 參數靈活性:允許運行時動態決定初始化參數
- 對象復用:支持重新初始化而不重新構造
- 代碼一致性:符合BRPC設計慣例
代碼8-1:
// 當前使用方法
TaskControl ctl;
if (ctl.init(32) != 0) {// 處理初始化失敗
}// 如果放在構造函數中
TaskControl ctl(32);
if (!ctl.is_initialized()) {// 處理錯誤
}
代碼8-2
TaskControl ctl;
ctl.init(16);
ctl.stop_and_join();
ctl.init(32);
代碼8-3
// 二段式
class TaskControl {
public:TaskControl(); // 輕量構造~TaskControl();int init(int nconcurrency); // 顯式初始化// ...
};// 使用方式
TaskControl ctl;
if (ctl.init(32) != 0) {LOG(ERROR) << "Failed to initialize TaskControl";return -1;
}// 合并到構造方式
class TaskControl {
public:explicit TaskControl(int nconcurrency); // 可能拋出異常~TaskControl();bool is_initialized() const; // 需額外狀態檢查// ...
};// 使用方式
try {TaskControl ctl(32);
} catch (const std::exception& e) {LOG(ERROR) << "Construction failed: " << e.what();
}
if (!ctl.is_initialized()) { // 需要額外檢查// 處理錯誤
}