【中間件】bthread_基礎_TaskControl

TaskControl

  • 1 Definition
  • 2 Introduce
      • **核心職責**
  • 3 成員解析
    • **3.1 數據結構與線程管理**
    • **3.2 任務調度與負載均衡**
    • **3.3 線程停放與喚醒(ParkingLot)**
    • **3.4 統計與監控**
  • 4 **工作流程**
  • 5 **設計亮點**
  • 6 **使用場景示例**
  • 7 **總結**
  • 8 學習過程中的疑問
    • 8.1 init函數為什么不在構造函數中調用

1 Definition

class TaskControl {friend class TaskGroup; // 友元類friend void wait_for_butex(void*); // 友元函數
#ifdef BRPC_BTHREAD_TRACERfriend bthread_t init_for_pthread_stack_trace(); // 友元函數
#endif // BRPC_BTHREAD_TRACERpublic:TaskControl();~TaskControl();// Must be called before using. `nconcurrency' is # of worker pthreads.int init(int nconcurrency);// Create a TaskGroup in this control.TaskGroup* create_group(bthread_tag_t tag);// Steal a task from a "random" group.bool steal_task(bthread_t* tid, size_t* seed, size_t offset);// Tell other groups that `n' tasks was just added to caller's runqueuevoid signal_task(int num_task, bthread_tag_t tag);// Stop and join worker threads in TaskControl.void stop_and_join();// Get # of worker threads.int concurrency() const { return _concurrency.load(butil::memory_order_acquire); }int concurrency(bthread_tag_t tag) const { return _tagged_ngroup[tag].load(butil::memory_order_acquire); }void print_rq_sizes(std::ostream& os);double get_cumulated_worker_time();double get_cumulated_worker_time_with_tag(bthread_tag_t tag);int64_t get_cumulated_switch_count();int64_t get_cumulated_signal_count();// [Not thread safe] Add more worker threads.// Return the number of workers actually added, which may be less than |num|int add_workers(int num, bthread_tag_t tag);// Choose one TaskGroup (randomly right now).// If this method is called after init(), it never returns NULL.TaskGroup* choose_one_group(bthread_tag_t tag);#ifdef BRPC_BTHREAD_TRACER// A stacktrace of bthread can be helpful in debugging.void stack_trace(std::ostream& os, bthread_t tid);std::string stack_trace(bthread_t tid);
#endif // BRPC_BTHREAD_TRACERprivate:typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;static const int PARKING_LOT_NUM = 4;typedef std::array<ParkingLot, PARKING_LOT_NUM> TaggedParkingLot;// Add/Remove a TaskGroup.// Returns 0 on success, -1 otherwise.int _add_group(TaskGroup*, bthread_tag_t tag);int _destroy_group(TaskGroup*);// Tag groupTaggedGroups& tag_group(bthread_tag_t tag) { return _tagged_groups[tag]; }// Tag ngroupbutil::atomic<size_t>& tag_ngroup(int tag) { return _tagged_ngroup[tag]; }// Tag parking slotTaggedParkingLot& tag_pl(bthread_tag_t tag) { return _pl[tag]; }static void delete_task_group(void* arg);static void* worker_thread(void* task_control);template <typename F>void for_each_task_group(F const& f);bvar::LatencyRecorder& exposed_pending_time();bvar::LatencyRecorder* create_exposed_pending_time();bvar::Adder<int64_t>& tag_nworkers(bthread_tag_t tag);bvar::Adder<int64_t>& tag_nbthreads(bthread_tag_t tag);std::vector<butil::atomic<size_t>> _tagged_ngroup;std::vector<TaggedGroups> _tagged_groups;butil::Mutex _modify_group_mutex;butil::atomic<bool> _init;  // if not init, bvar will case coredumpbool _stop;butil::atomic<int> _concurrency;std::vector<pthread_t> _workers;butil::atomic<int> _next_worker_id;bvar::Adder<int64_t> _nworkers;butil::Mutex _pending_time_mutex;butil::atomic<bvar::LatencyRecorder*> _pending_time;bvar::PassiveStatus<double> _cumulated_worker_time;bvar::PerSecond<bvar::PassiveStatus<double> > _worker_usage_second;bvar::PassiveStatus<int64_t> _cumulated_switch_count;bvar::PerSecond<bvar::PassiveStatus<int64_t> > _switch_per_second;bvar::PassiveStatus<int64_t> _cumulated_signal_count;bvar::PerSecond<bvar::PassiveStatus<int64_t> > _signal_per_second;bvar::PassiveStatus<std::string> _status;bvar::Adder<int64_t> _nbthreads;std::vector<bvar::Adder<int64_t>*> _tagged_nworkers;std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time;std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*> _tagged_worker_usage_second;std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;std::vector<TaggedParkingLot> _pl;#ifdef BRPC_BTHREAD_TRACERTaskTracer _task_tracer;
#endif // BRPC_BTHREAD_TRACER};

2 Introduce

TaskControl作為任務調度控制中心,管理多個任務組(TaskGroup)并協調工作線程的高效運作,適用于BRPC的bthread協程庫:

核心職責

  1. 任務組管理:創建、銷毀任務組,支持按標簽(bthread_tag_t)分類管理。
  2. 線程池調度:動態調整工作線程數量,實現任務竊取(Work Stealing)以平衡負載。
  3. 同步與喚醒:通過停放區(ParkingLot)管理線程的休眠與喚醒。
  4. 性能監控:集成統計模塊(bvar)跟蹤任務處理時間、切換次數等指標。

3 成員解析

3.1 數據結構與線程管理

  • _tagged_groups
    類型:std::vector<TaggedGroups>
    作用:按標簽存儲任務組指針數組,每個標簽對應一個TaggedGroups(固定大小為BTHREAD_MAX_CONCURRENCY的數組)。
    示例:標簽可用于區分不同業務優先級或租戶的任務。

  • _tagged_ngroup
    類型:std::vector<butil::atomic<size_t>>
    作用:記錄每個標簽下的任務組數量,原子操作保證線程安全。

  • _workers
    類型:std::vector<pthread_t>
    作用:保存所有工作線程的ID,用于線程生命周期管理(啟動、停止、回收)。

  • _concurrency
    類型:butil::atomic<int>
    作用:總工作線程數,支持原子讀寫,動態調整并發度。

3.2 任務調度與負載均衡

  • steal_task(bthread_t* tid, size_t* seed, size_t offset)
    作用:從其他任務組竊取任務,避免工作線程空閑。
    實現:

    • 使用seed隨機選擇目標組,結合offset避免多個線程競爭同一隊列。
    • 竊取成功返回true,任務ID存入tid
  • signal_task(int num_task, bthread_tag_t tag)
    作用:通知任務組有新任務加入,觸發喚醒機制。
    場景:當任務被添加到隊列時,調用此方法喚醒可能休眠的線程。

  • choose_one_group(bthread_tag_t tag)
    作用:根據標簽選擇一個任務組,用于任務分發或負載均衡。
    策略:可能采用輪詢或隨機算法選擇組,確保任務均勻分配。

3.3 線程停放與喚醒(ParkingLot)

  • _pl
    類型:std::vector<TaggedParkingLot>
    作用:按標簽管理的停放區數組,每個標簽對應PARKING_LOT_NUM個停放區。
    機制:
    • 工作線程無任務時進入停放區等待,減少CPU空轉。
    • 新任務到達時,通過停放區喚醒線程,降低延遲。

3.4 統計與監控

  • bvar集成
    關鍵指標:

    • _cumulated_worker_time:累計任務處理時間。
    • _cumulated_switch_count:上下文切換次數。
    • _signal_per_second:每秒任務喚醒次數。
      作用:通過BRPC的bvar庫暴露性能指標,方便實時監控與調優。
  • 標簽化統計
    成員如_tagged_nworkers_tagged_cumulated_worker_time等,按標簽細分統計,支持多維分析。

4 工作流程

  1. 初始化

    • 調用init(nconcurrency)創建指定數量工作線程,每個線程執行worker_thread函數。
    • 工作線程通過choose_one_group選擇任務組,執行任務循環。
  2. 任務執行

    • 線程從本地任務組獲取任務,若隊列為空,嘗試從其他組竊取(steal_task)。
    • 無任務可執行時,進入停放區(ParkingLot)休眠。
  3. 任務通知

    • 添加新任務時,調用signal_task遞增信號計數器,喚醒停放區線程。
  4. 動態擴縮容

    • add_workers動態增加指定標簽的工作線程,適應負載變化。
  5. 停止與清理

    • stop_and_join設置_stop標志,通知所有線程退出,并回收資源。

5 設計亮點

  • 標簽化分組
    支持多維度任務分類,適用于混合部署場景(如不同服務優先級)。
  • 任務竊取
    避免工作線程閑置,提升CPU利用率,降低任務處理延遲。
  • 高效同步
    結合原子操作與停放區,減少鎖競爭,保證高吞吐量。
  • 精細化監控
    通過bvar提供詳盡的運行時指標,助力性能分析與優化。

6 使用場景示例

高并發服務

  • Web服務器接收請求后,封裝為bthread任務,按業務類型打標簽。
  • TaskControl根據標簽分發任務到不同組,保證關鍵業務優先調度。
  • 工作線程動態擴展應對流量高峰,空閑時自動縮減節省資源。
  • 監控指標實時反饋系統負載,輔助容量規劃。

7 總結

TaskControl是BRPC bthread調度系統的核心,通過高效的任務組管理、工作線程調度及細粒度監控,實現了高并發、低延遲的協程任務處理。其設計充分考慮了擴展性、性能與可觀測性,是構建高性能C++服務的基石組件。

8 學習過程中的疑問

8.1 init函數為什么不在構造函數中調用

疑問:init函數在注釋中聲明需要在使用前調用,為什么不能將其放在構造函數中直接調用呢?
回答:

  • 可能原因:

      1. 初始化可能失敗,需要錯誤處理
      • 構造函數沒有返回值,若在構造函數中執行可能的失敗操作(eg. 創建線程、分配資源),只能通過異常或設置內部狀態標記錯誤。brpc代碼風格傾向于避免異常,習慣于返回錯誤碼。參考代碼8-1
      1. 需要依賴外部參數
      • 構造時參數可能不完整,TaskControl的初始化需要并發線程數等參數,可能在運行時動態確定,無法在編譯器硬編碼。
      • 更復雜的重載,如果后續需要擴展初始化參數(eg. 增加timeout/policy等配置),顯式init()更容易擴展,而構造函數重載會膨脹。
    • 3 支持對象的復用
      • 銷毀后重新初始化,某些場景下,用戶可能希望銷毀TaskControl后重新初始化(eg. 動態調整線程池的大小)。若初始化邏輯在構造函數中,則需要先析構對象再重新構造,而顯式init/stop_and_join允許復用同一對象。參考代碼8-2
    • 4 明確的二段式生命周期
      • 分離資源分配與初始化,二段式設計(構造+init())將對喜愛嗯的內存分配和資源初始化解耦:
        • 構造階段:僅進行內存布局、簡單成員初始化;
        • 初始化階段:執行重量級操作(eg. 創建線程、連接資源);
      • 更符合RAII的變體模式,尤其是在需要延遲初始化時。
    • 5 避免隱藏的副作用
      • 隱式初始化可能引入意外行為,若構造函數自動初始化,用戶可能在不知情的情況下觸發資源分配(eg. 線程創建)。顯式init()強制用戶主動控制初始化時機,避免副作用。
    • 6 與brpc其他組件的設計一致性
      • 統一風格,brpc很多組件(eg. Channel / Server)均采用類似的二段式模式(先構造,再調用init() / start()),保持代碼風格統一,降低用戶學習成本。
  • 什么情況下應在構造函數中初始化?

    • 輕量級且無失敗可能的操作,eg. 設置默認參數、初始化原子計數器等。
    • 強制一次性初始化,若對象必須在構造時完全初始化,且不允許重新初始化。
  • 兩種方式對比,見代碼8-3

  • 二段式的作用

    • 清晰的錯誤處理:通過返回int明確傳遞錯誤
    • 參數靈活性:允許運行時動態決定初始化參數
    • 對象復用:支持重新初始化而不重新構造
    • 代碼一致性:符合BRPC設計慣例

代碼8-1:

// 當前使用方法
TaskControl ctl;
if (ctl.init(32) != 0) {// 處理初始化失敗
}// 如果放在構造函數中
TaskControl ctl(32);
if (!ctl.is_initialized()) {// 處理錯誤
}

代碼8-2

TaskControl ctl;
ctl.init(16);
ctl.stop_and_join();
ctl.init(32);

代碼8-3

// 二段式
class TaskControl {
public:TaskControl();  // 輕量構造~TaskControl();int init(int nconcurrency);  // 顯式初始化// ...
};// 使用方式
TaskControl ctl;
if (ctl.init(32) != 0) {LOG(ERROR) << "Failed to initialize TaskControl";return -1;
}// 合并到構造方式
class TaskControl {
public:explicit TaskControl(int nconcurrency);  // 可能拋出異常~TaskControl();bool is_initialized() const;  // 需額外狀態檢查// ...
};// 使用方式
try {TaskControl ctl(32);
} catch (const std::exception& e) {LOG(ERROR) << "Construction failed: " << e.what();
}
if (!ctl.is_initialized()) {  // 需要額外檢查// 處理錯誤
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/79232.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/79232.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/79232.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

win11 終端 安裝ffmpeg 使用終端Scoop

1、安裝scoop (Windows 包管理器) Set-ExecutionPolicy RemoteSigned -Scope CurrentUser iwr -useb get.scoop.sh | iex 2、使用scoop來安裝ffmpeg scoop install ffmpeg 3、測試一下ffmpeg&#xff0c;將Mp3文件轉為Wav文件 ffmpeg -i A.mp3 A.wav 然后我們就看到A.wav生成…

力扣838.推多米諾隨筆

“生活就像海洋&#xff0c;只有意志堅強的人&#xff0c;才能到達彼岸。”—— 馬克思 題目 n 張多米諾骨牌排成一行&#xff0c;將每張多米諾骨牌垂直豎立。在開始時&#xff0c;同時把一些多米諾骨牌向左或向右推。 每過一秒&#xff0c;倒向左邊的多米諾骨牌會推動其左側…

超級好用的??參數化3D CAD 建模??圖形庫 (CadQuery庫介紹)

CadQuery 庫詳細介紹?? ??CadQuery?? 是一個基于 ??Python?? 的 ??參數化 3D CAD 建模?? 庫&#xff0c;允許用戶通過編寫代碼&#xff08;而不是傳統 GUI&#xff09;來創建精確的 ??3D 模型??。它特別適用于 ??自動化設計、機械工程、3D 打印?? 等場景…

HBM的哪些事

命令操作 這也許是DDR往HBM演進的一些奇淫技巧。 本篇內容屬于雜談&#xff0c;關于HBM的奇淫技巧&#xff0c;隨后出專題介紹。

Python基于深度學習的網絡輿情分析系統(附源碼,部署)

大家好&#xff0c;我是Python徐師兄&#xff0c;一個有著7年大廠經驗的程序員&#xff0c;也是一名熱衷于分享干貨的技術愛好者。平時我在 CSDN、掘金、華為云、阿里云和 InfoQ 等平臺分享我的心得體會。 &#x1f345;文末獲取源碼聯系&#x1f345; 2025年最全的計算機軟件畢…

滑動窗口leetcode 209和76

一、leetcode 209. 長度最小的子數組 代碼&#xff1a; class Solution { public:int minSubArrayLen(int target, vector<int>& nums) {int n nums.size();int left 0;int sum 0;int res 100001;for(int right 0;right <n;right){sum nums[right];while(s…

node.js 實戰——mongoDB 續一

mongoDB的基本指令 進入mongodb mongo顯示當前的所有數據庫 show dbs # 或者 show databases切換數據庫/進入指定數據庫 使用這個命令的時候&#xff0c;是不要求這個數據庫是否創建 use 數據庫名顯示當前數據庫 db顯示數據庫中所有集合 show collections數據庫的CRUD的…

SVMSPro平臺獲取Websocket視頻流規則

SVMSPro平臺獲取Websocket視頻流規則 Websocket 的服務端口為&#xff1a;53372&#xff0c;如需要公網訪問需要開啟這個端口 這里講的是如何獲取長效URL&#xff0c;短效&#xff08;時效性&#xff09;URL也支持&#xff0c;下回講 一、如何獲取Websocket實時流視頻 ws:/…

Arduino按鍵開關編程詳解

一、按鍵開關的基本原理與硬件連接 1.1 按鍵開關的工作原理 按鍵開關是一種常見的輸入設備&#xff0c;其核心原理基于機械觸點的閉合與斷開。當用戶按下按鍵時&#xff0c;內部的金屬片會連接電路兩端&#xff0c;形成通路&#xff1b;松開按鍵后&#xff0c;金屬片在彈簧作…

我的日記雜文

Sequoia sempervirens 北美紅杉樹 Troll 洞穴巨人 喜歡在網上搞事的人 piss off 滾開 讓人惱火的 歐洲美甲 60euor - 30euro 拖車 mobie house Motel 汽車旅館 Minoxidil 米諾地爾 Health insurance 醫療保險 casetify 香港手機品牌 coolant 汽車防凍液 Auto tint film 汽車貼…

數字智慧方案5867丨智慧建造(BIM技術智慧工地)在施工階段的實踐與應用方案(90頁PPT)(文末有下載方式)

資料解讀&#xff1a;智慧建造(BIM技術智慧工地)在施工階段的實踐與應用方案 詳細資料請看本解讀文章的最后內容。 在當今的建筑行業中&#xff0c;智慧建造已成為提升施工效率和質量的關鍵手段。隨著科技的進步&#xff0c;智慧建造結合了物聯網、大數據、人工智能等技術&am…

機器學習中的標簽策略:直接標簽、代理標簽與人工數據生成

機器學習中的標簽策略&#xff1a;直接標簽、代理標簽與人工數據生成 摘要 本文深入探討了機器學習領域中標簽的關鍵概念&#xff0c;包括直接標簽與代理標簽的定義、優缺點比較&#xff0c;以及人工生成數據的相關內容。通過詳細實例和練習&#xff0c;幫助讀者理解如何選擇…

從0搭建Transformer

1. 位置編碼模塊&#xff1a; import torch import torch.nn as nn import mathclass PositonalEncoding(nn.Module):def __init__ (self, d_model, dropout, max_len5000):super(PositionalEncoding, self).__init__()self.dropout nn.Dropout(pdropout)# [[1, 2, 3],# [4, 5…

【Bootstrap V4系列】學習入門教程之 表格(Tables)和畫像(Figure)

Bootstrap V4系列 學習入門教程之 表格&#xff08;Tables&#xff09;和畫像&#xff08;Figure&#xff09; 表格&#xff08;Tables&#xff09;一、Examples二、Table head options 表格頭選項三、Striped rows 條紋行四、Bordered table 帶邊框的表格五、Borderless table…

在C# WebApi 中使用 Nacos02: 配置管理、服務管理實戰

一、配置管理 1.添加一個新的命名空間 這里我都填寫為publicdemo 2.C#代碼配置啟動 appsetting.json加上&#xff1a; (nacos默認是8848端口) "NacosConfig": {"ServerAddresses": [ "http://localhost:8848" ], // Nacos 服務器地址"Na…

如何搭建spark yarn 模式的集群集群。

下載 App 如何搭建spark yarn 模式的集群集群。 搭建Spark on YARN集群的詳細步驟 Spark on YARN模式允許Spark作業在Hadoop YARN資源管理器上運行&#xff0c;利用YARN進行資源調度。以下是搭建步驟&#xff1a; 一、前提條件 已安裝并配置好的Hadoop集群&#xff08;包括HDF…

C++--入門基礎

C入門基礎 1. C的第一個程序 C繼承C語言許多大多數的語法&#xff0c;所以以C語言實現的hello world也可以運行&#xff0c;C中需要把文件定義為.cpp&#xff0c;vs編譯器看是.cpp就會調用C編譯器編譯&#xff0c;linux下要用g編譯&#xff0c;不再是gcc。 // test.cpp #inc…

從實列中學習linux shell9 如何確認 服務器反應遲鈍是因為cpu還是 硬盤io 到底是那個程序引起的。cpu負載多高算高

在 Linux 系統中,Load Average(平均負載) 是衡量系統整體壓力的關鍵指標,但它本身沒有絕對的“高/低”閾值,需要結合 CPU 核心數 和 其他性能指標 綜合分析。以下是具體判斷方法: 一、Load Average 的基本含義 定義:Load Average 表示 單位時間內處于可運行狀態(R)和不…

聊一聊接口測試更側重于哪方面的驗證

目錄 一、功能性驗證 輸入與輸出正確性 參數校驗 業務邏輯覆蓋 二、數據一致性驗證 數據格式規范 數據完整性 數據類型與范圍 三、異常場景驗證 容錯能力測試 邊界條件覆蓋 錯誤碼與信息清晰度 四、安全與權限驗證 身份認證 數據安全 防攻擊能力 五、性能與可…

Fiddler抓取APP端,HTTPS報錯全解析及解決方案(一篇解決常見問題)

環境&#xff1a;雷電模擬器Android9系統 ? 你所遇到的fiddler中抓取HTTPS的問題可以分為三類&#xff1a;一類是你自己證書安裝上邏輯錯誤&#xff0c;另一種是APP中使用了“證書固定”的手段。三類fiddler中生成證書時的參數過程。 1.Fiddler證書安裝上的邏輯錯誤 更新Opt…