第九章:線程池(ThreadPool)
在第八章《TcpServer》中,我們了解到muduo::net::TcpServer
通過EventLoop
線程池處理入站連接。
這些EventLoop
線程主要負責網絡I/O:套接字讀寫和定時器處理,由Poller
和Channel
協調,保持高效響應。
但當
EventLoop
線程中的MessageCallback
函數需要執行耗時操作時會發生什么?
例如:
- 復雜計算
- 數據庫訪問(可能阻塞等待結果)
- 大文件磁盤寫入(可能阻塞)
- 同步網絡請求其他服務
若直接在EventLoop
線程執行這些操作,線程將被阻塞,導致該線程管理的所有連接無法處理其他事件,服務器響應能力下降
甚至引發超時。
這正是muduo::ThreadPool
的價值所在。
線程池解決的問題
核心目標是將可能阻塞或耗時的計算任務從關鍵EventLoop
線程卸載
。
EventLoop
線程必須保持空閑以快速處理網絡I/O。
ThreadPool
提供專用線程組執行任意任務。當EventLoop
回調中遇到耗時任務時,將其提交給線程池,由池中的工作線程異步處理,使EventLoop
線程快速返回繼續處理網絡事件。
類比場景:
EventLoop
線程如同高效的前臺接待員,僅處理簡短交互(如引導訪客、接收郵件)。ThreadPool
則是后勤團隊,處理復雜請求(如審核詳細申請),前臺人員可立即返回崗位繼續接待。- (類似于
redis
和mysql
實現分類,還有上一篇文章當中的reactor分離
,也是一個前臺+多個處理人員)
ThreadPool:后臺任務執行引擎
muduo::ThreadPool
類管理一組待命的工作線程,關鍵設計要素包括:
- 線程集合:使用
muduo::Thread
類(第二章:線程)創建指定數量的工作線程 - 任務隊列:線程安全的雙端隊列(
std::deque
),通過互斥鎖(MutexLock
)和條件變量(Condition
)實現同步 - 工作線程邏輯:每個線程循環執行:取任務→執行→等待新任務
- 任務提交(
run()
):通過run()
方法將函數對象加入隊列 - 阻塞等待(
take()
):隊列空時,工作線程通過條件變量(notEmpty_
)阻塞休眠 - 喚醒機制:添加新任務時通過
notEmpty_.notify()
喚醒等待線程 - 有界隊列(可選):可設置隊列最大容量(
maxQueueSize_
),滿時run()
可能阻塞(通過notFull_
),防止任務積壓 - 啟停控制:
start()
啟動線程池,stop()
停止并等待線程退出
使用ThreadPool執行異步任務
通過創建實例、啟動線程池并調用run()
提交任務即可使用:
#include "muduo/base/ThreadPool.h"
#include "muduo/base/CurrentThread.h"
#include "muduo/base/CountDownLatch.h"
#include <cstdio>
#include <string>
#include <unistd.h>
#include <functional>// 工作線程中運行的函數
void print(const std::string& msg) {printf("線程池任務: %s, 進程ID: %d, 線程ID: %d\n",msg.c_str(), getpid(), muduo::CurrentThread::tid());sleep(1); // 模擬耗時操作
}int main() {printf("主線程啟動. 進程ID: %d, 線程ID: %d\n", getpid(), muduo::CurrentThread::tid());muduo::ThreadPool pool("MyWorkerPool"); // 1. 創建線程池pool.setMaxQueueSize(5); // 設置隊列最大容量為5pool.start(3); // 2. 啟動3個工作線程sleep(1); // 等待線程啟動printf("線程池已啟動.\n");// 3. 提交10個任務for (int i = 0; i < 10; ++i) {char task_msg[32];snprintf(task_msg, sizeof task_msg, "任務%d", i);pool.run(std::bind(print, std::string(task_msg))); // 提交任務}printf("任務提交完成.\n");sleep(10); // 等待任務執行pool.stop(); // 4. 停止線程池printf("線程池已停止.\n");return 0;
}
(需包含Muduo頭文件并鏈接庫)
使用muduo網絡庫中的線程池組件處理并發任務的基本流程,包含四個關鍵操作:
創建線程池、啟動線程、提交任務、停止線程池。
原理
線程池管理一組工作線程,避免頻繁創建銷毀線程的開銷。
任務提交后進入隊列,空閑線程自動獲取任務執行。
muduo庫的ThreadPool
采用生產者-消費者
模型,主線程提交任務(生產者),工作線程處理任務(消費者)。
解析
初始化階段
-
muduo::ThreadPool pool("MyWorkerPool")
創建名為"MyWorkerPool"的線程池實例。
pool.setMaxQueueSize(5)
限制任務隊列最多容納5個未處理任務
,防止內存過度消耗。pool.start(3)
啟動3個常駐工作線程,這些線程會持續從任務隊列獲取任務執行。啟動后立即進入等待任務狀態。
任務提交階段
-
循環提交10個打印任務,每個任務綁定
print
函數和字符串參數。當任務提交速度超過處理速度時,后提交的任務會因隊列滿(max=5)而阻塞,直到有線程處理完隊列任務。
print
函數顯示任務信息、進程PID和線程TID,通過sleep(1)
模擬耗時操作。注意線程ID由muduo::CurrentThread::tid()
獲取,這是muduo庫提供的跨平臺線程標識。
終止階段
sleep(10)
確保所有任務完成,pool.stop()
安全停止線程池:停止接受新任務,等待正在執行的任務完成,最后回收線程資源。
執行效果說明
程序輸出將顯示:
- 主線程信息
- 3個工作線程循環執行10個任務(每個任務間隔1秒)
- 由于只有3個線程,10個任務需約4秒完成(3并發+隊列等待)
- 最終線程池停止信息
該模式適用于需要批量處理短生命周期任務
的場景,如網絡請求處理、日志寫入等I/O密集型操作。
運行
理解:貨架上有五個空位
,三個三個的往上放
貨物
執行流程:
- 主線程創建線程池并啟動3個工作線程
- 工作線程啟動后立即嘗試從空隊列取任務,進入阻塞等待
- 主循環提交10個任務,工作線程被喚醒并并發執行
- 輸出顯示不同線程ID執行任務,主線程快速返回
- 任務隊列滿時
run()
可能阻塞,直到有空閑 stop()
通知所有線程退出,等待清理
線程池內部機制
核心交互流程:
加鎖
到任務隊列中,再加鎖
的執行任務
關鍵源碼(簡化版):
// ThreadPool.h
class ThreadPool : noncopyable {
public:using Task = std::function<void ()>; // 任務類型explicit ThreadPool(const string& name = "ThreadPool");void setMaxQueueSize(int maxSize); // 設置隊列容量void start(int numThreads); // 啟動線程void stop(); // 停止線程void run(Task f); // 提交任務private:bool isFull() const; // 隊列是否滿Task take(); // 獲取任務void runInThread(); // 工作線程主循環mutable MutexLock mutex_; // 保護隊列和狀態Condition notEmpty_; // 非空條件變量Condition notFull_; // 非滿條件變量std::deque<Task> queue_; // 任務隊列size_t maxQueueSize_ = 0; // 隊列最大容量bool running_ = false; // 運行狀態
};// ThreadPool.cc
void ThreadPool::start(int numThreads) {running_ = true;// 創建numThreads個工作線程threads_.reserve(numThreads);for (int i = 0; i < numThreads; ++i) {threads_.emplace_back(new Thread(std::bind(&ThreadPool::runInThread, this)));threads_.back()->start();}
}void ThreadPool::run(Task task) {if (threads_.empty()) {task(); // 無線程時直接執行} else {MutexLockGuard lock(mutex_);while (isFull() && running_) {notFull_.wait(); // 隊列滿時等待}queue_.push_back(std::move(task));notEmpty_.notify(); // 通知工作線程}
}ThreadPool::Task ThreadPool::take() {MutexLockGuard lock(mutex_);while (queue_.empty() && running_) {notEmpty_.wait(); // 隊列空時等待}Task task;if (!queue_.empty()) {task = queue_.front();queue_.pop_front();if (maxQueueSize_ > 0) {notFull_.notify(); // 通知任務提交者}}return task;
}void ThreadPool::runInThread() {while (running_) {Task task(take()); // 循環取任務if (task) task(); // 執行任務}
}
?核心功能
任務隊列管理
ThreadPool 使用 std::deque<Task>
作為任務隊列,通過 maxQueueSize_
控制隊列容量。
任務類型為 std::function<void()>
,表示無參數無返回值的可調用對象。
線程工作流程
- 啟動時通過
start()
創建多個工作線程,每個線程執行runInThread()
循環。 runInThread()
不斷調用take()
獲取任務并執行。若隊列為空,線程在notEmpty_
條件變量上等待。take()
從隊列頭部取出任務,若隊列從滿變為非滿,通知notFull_
喚醒可能阻塞的任務提交者。
任務提交邏輯
run()
提交任務時,若隊列已滿且線程池在運行,提交線程在notFull_
上等待。- 任務入隊后通過
notEmpty_.notify()
喚醒一個工作線程。 - 若線程池未啟動或無工作線程,任務直接在當前線程執行。
同步機制
MutexLock
保護隊列和狀態變量(running_
)。- 兩個
Condition
變量(notEmpty_
、notFull_
)實現生產者-消費者模型,避免忙等待。
代碼設計簡潔高效,適合處理大量短期任務,典型應用如網絡服務器的請求處理。
總結
功能 | 描述 | 作用/優勢 |
---|---|---|
工作線程 | 基于muduo::Thread 的線程集合 | 提供獨立于I/O線程的任務執行環境 |
任務隊列 | 線程安全的雙端隊列存儲std::function<void () 類型任務 | 解耦任務提交與執行,支持異步處理 |
run()方法 | 提交任務到隊列,隊列滿時可能阻塞 | 用戶接口,確保任務安全加入隊列 |
take()方法 | 工作線程從隊列取任務,隊列空時阻塞 | 核心任務獲取機制,保證工作線程高效等待 |
runInThread() | 工作線程主循環,執行take() 和任務 | 定義工作線程行為,維持任務處理循環 |
互斥鎖 | 保護隊列和運行狀態 | 確保多線程訪問共享數據的正確性 |
條件變量 | notEmpty_ 和notFull_ 協調線程間通信 | 實現高效的任務調度與資源管理 |
有界隊列 | 通過maxQueueSize_ 限制隊列容量 | 防止任務堆積導致內存溢出,提供背壓機制 |
結論
muduo::ThreadPool
是Muduo庫中處理異步任務的核心組件,通過線程安全的任務隊列和工作線程池,有效隔離耗時操作與網絡I/O處理。其價值體現在:
- 資源隔離:保護
EventLoop
線程免受阻塞,確保網絡高吞吐 - 彈性擴展:通過線程數配置適應不同計算負載
- 流量控制:有界隊列防止資源耗盡
- 簡化并發:
封裝底層線程同步細節
,提供簡潔API
下一章我們將探討TcpClient
類,了解如何主動連接遠程服務并管理TCP連接。
第十章:TcpClient
第10章:TcpClient
-
在上一章第9章:線程池中,我們討論了
muduo::ThreadPool
如何通過卸載阻塞任務來保持EventLoop
線程的響應能力。 -
在此之前,我們探討了
TcpServer
(第8章:TcpServer),它允許使用EventLoop
線程池構建監聽并管理
多個傳入TCP連接的服務器。
但如果想編寫主動連接到遠程服務器的程序(而不是等待連接)呢?
這就是網絡客戶端的作用。
-
需要一種方法來連接到特定地址和端口,處理連接建立的
異步性
,管理連接后的數據交換,并可能需要處理連接失敗和重試。 -
手動實現這些需要創建套接字、設置為
非阻塞模式
、發起connect(2)
系統調用(在非阻塞模式下會立即返回但稍后完成連接)、使用EventLoop
和Channel
監視套接字的可寫性(表示連接完成)或錯誤,然后管理已連接的套接字
。
這非常復雜,尤其是在需要自動重連
等功能時。
這正是muduo::net::TcpClient
要解決的問題。
TcpClient 解決了什么問題?
muduo::net::TcpClient
為TCP連接的客戶端提供了高級抽象。它處理以下完整流程:
- 發起連接:創建非阻塞套接字并調用
connect()
- 監控連接狀態:使用
EventLoop
等待連接成功建立或失敗 - 管理連接:連接成功后,創建并管理單個
TcpConnection
對象(第6章:TcpConnection)來處理數據傳輸(發送和接收),使用其EventLoop
和Buffer
(第7章:Buffer) - 處理斷開連接:在連接關閉時通知
- 自動重試(可選):如果啟用,在初始連接失敗或連接丟失時自動嘗試重連
將
TcpClient
想象成一位專門代表,其職責是呼叫
另一個辦公室(TcpServer
)并建立單條通信線路。
它處理撥號、等待對方接聽以及通話接通后的線路管理。
如果線路忙或通話中斷,可以配置自動重撥。
TcpClient:連接發起者
muduo::net::TcpClient
是在Muduo中創建TCP客戶端應用程序的主要類。
以下是muduo::net::TcpClient
的關鍵概念:
- 建立單一連接:與
TcpServer
不同,TcpClient
對象設計用于建立和管理到一個特定遠程服務器地址的連接,不處理多個傳入連接 - 由單個EventLoop擁有:
TcpClient
對象必須在單個EventLoop
線程中存在和使用,通常是客戶端的主循環。所有回調和內部操作都在此循環線程中發生 - 擁有Connector:內部使用
Connector
對象,該組件負責異步connect()
系統調用、監視套接字通道的連接完成/失敗,并實現重試邏輯 - 擁有TcpConnection(連接時):當
Connector
成功建立連接后,TcpClient
會為新套接字創建TcpConnection
對象,所有數據I/O都通過該對象進行 - 使用相同回調:與
TcpServer
類似,通過熟悉的ConnectionCallback
、MessageCallback
和WriteCompleteCallback
暴露連接生命周期和數據事件 - 連接/斷開/停止控制:提供
connect()
、disconnect()
、stop()
方法控制客戶端狀態 - 自動重試:
enableRetry()
方法允許在連接失敗或丟失時啟用自動重連
使用TcpClient:連接到回聲服務器
讓我們編寫一個連接到第8章:TcpServer中構建的回聲服務器的簡單客戶端。
該客戶端將連接、發送消息、接收回聲并斷開連接。
需要:
- 客戶端的
EventLoop
- 服務器地址的
InetAddress
- 回調函數
#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/InetAddress.h"
#include "muduo/base/Logging.h" // 用于LOG_INFO
#include <cstdio> // 用于printf
#include <string> // 用于std::string// 我們*單條連接*的TcpConnectionPtr(shared_ptr指向TcpConnection)
muduo::net::TcpConnectionPtr clientConnection;// 示例連接回調
void onConnection(const muduo::net::TcpConnectionPtr& conn) {LOG_INFO << "客戶端連接 " << (conn->connected() ? "建立" : "斷開");if (conn->connected()) {// 連接建立時存儲連接指針clientConnection = conn;LOG_INFO << "已連接到 " << conn->peerAddress().toIpPort();// 發送初始消息std::string message = "你好,來自客戶端!\n";printf("發送: %s", message.c_str());conn->send(message);} else {// 連接斷開,清空存儲的指針clientConnection.reset();LOG_INFO << "已斷開與 " << conn->peerAddress().toIpPort() << " 的連接";// 實際應用中可在此處調用loop.quit()或處理重連conn->getLoop()->quit(); // 簡單示例:斷開時退出循環}
}// 示例消息回調(處理回聲數據)
void onMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer* buf,muduo::Timestamp receiveTime) {LOG_INFO << "從 " << conn->name() << " 接收到 " << buf->readableBytes() << " 字節";// 將回聲數據轉為字符串std::string msg = buf->retrieveAllAsString();printf("收到回聲消息: %s", msg.c_str());// 簡單示例:收到回聲后發送下一條消息// 實際協議中應處理消息并決定是否響應// std::string nextMessage = "另一條消息...\n";// conn->send(nextMessage); // 按需持續發送
}int main() {LOG_INFO << "main(): 進程ID = " << getpid();muduo::net::EventLoop loop; // 客戶端的事件循環// 服務器地址:localhost (127.0.0.1),端口9988muduo::net::InetAddress serverAddr("127.0.0.1", 9988);// 創建TcpClient實例muduo::net::TcpClient client(&loop, serverAddr, "EchoClient");// --- 配置客戶端 ---// 設置回調client.setConnectionCallback(onConnection);client.setMessageCallback(onMessage);// 可選:啟用連接失敗/斷開自動重連// client.enableRetry();// LOG_INFO << "已啟用重連";// --- 啟動連接過程 ---printf("啟動客戶端,正在連接到 %s...\n", serverAddr.toIpPort().c_str());client.connect();// --- 運行客戶端循環 ---printf("運行客戶端循環...\n");loop.loop(); // 阻塞直到調用loop.quit()// loop.loop()在連接關閉且onConnection調用loop->quit()后退出printf("客戶端已停止。main()退出。\n");return 0;
}
(需要包含必要的Muduo頭文件并鏈接庫)
這段代碼是使用muduo網絡庫(一個高性能C++網絡庫)實現的TCP客戶端程序,負責連接到指定服務器
,發送初始消息 并接收服務器返回的數據(回聲)。
關鍵組件說明
必要頭文件
TcpClient.h
:客戶端核心類EventLoop.h
:事件循環處理類InetAddress.h
:網絡地址封裝類Logging.h
:日志輸出工具
主要執行流程
全局連接對象
muduo::net::TcpConnectionPtr clientConnection; // 保存當前有效連接
連接狀態回調
void onConnection(const muduo::net::TcpConnectionPtr& conn) {// 連接建立時:// 1. 存儲連接對象// 2. 發送初始問候消息// 連接斷開時:// 1. 清除連接對象// 2. 退出事件循環
}
數據接收回調
void onMessage(/*參數略*/) {// 1. 讀取接收緩沖區數據// 2. 打印接收內容(示例為回聲服務)// 3. 可擴展發送后續消息
}
主程序邏輯
int main() {// 1. 創建事件循環對象// 2. 指定服務器地址(127.0.0.1:9988)// 3. 創建客戶端實例// 4. 綁定回調函數// 5. 發起連接請求// 6. 運行事件循環(持續處理網絡事件)
}
運行
- 客戶端啟動后自動連接服務器
- 連接成功后立即發送"你好,來自客戶端!"
- 接收服務器返回的相同消息(回聲)
- 斷開連接時自動退出程序
注:實際使用時需配合對應的回聲服務器運行,示例默認使用本地127.0.0.1:9988
main()
關鍵部分解析:
muduo::net::EventLoop loop;
:創建客戶端的單一事件循環muduo::net::InetAddress serverAddr("127.0.0.1", 9988);
:指定服務器地址muduo::net::TcpClient client(...);
:創建關聯事件循環和服務器地址的客戶端實例client.setConnectionCallback(...);
:設置連接狀態變更回調client.connect();
:啟動異步連接過程loop.loop();
:啟動事件循環
TcpClient內部機制:Connector與狀態管理
TcpClient
依賴內部Connector
對象管理連接建立階段和重試,同時管理狀態標志和用于保存連接的shared_ptr
。
調用client.connect()
時的流程:
核心源碼解析
TcpClient.h
關鍵成員(詳見附帶的代碼):
class TcpClient : noncopyable {public:// 構造函數TcpClient(EventLoop* loop, const InetAddress& serverAddr, const string& nameArg);~TcpClient();void connect(); // 啟動連接void disconnect(); // 優雅斷開void stop(); // 停止連接和重試// 設置用戶回調void setConnectionCallback(ConnectionCallback cb);void setMessageCallback(MessageCallback cb);private:EventLoop* loop_;ConnectorPtr connector_; // 連接處理器TcpConnectionPtr connection_; // 當前連接// ... 其他成員
};
Connector
類(詳見附帶的Connector.h/cc
)關鍵方法:
start()
:由TcpClient::connect()
調用,加入事件循環隊列handleWrite()
:處理連接完成事件retry()
:實現帶退避策略的重連機制
總結
功能 | 描述 | 優勢 |
---|---|---|
連接發起器 | 啟動與遠程服務器的連接過程 | 提供高級連接建立抽象 |
EventLoop綁定 | 完全在單個事件循環線程內運行 | 簡化并發控制 |
Connector組件 | 處理異步connect 和重試邏輯 | 封裝復雜的狀態管理 |
單一連接管理 | 通過TcpConnection 管理活動連接 | 簡化客戶端連接管理邏輯 |
標準回調接口 | 使用與TcpServer 相同的回調機制 | 統一編程模型 |
自動重試機制 | 支持可配置的退避重試策略 | 提升客戶端容錯能力 |
結論
muduo::net::TcpClient
是構建Muduo客戶端應用的核心抽象。
-
通過內部
Connector
封裝異步連接建立的復雜性,并通過標準TcpConnection
對象管理通信。 -
通過回調機制定義連接生命周期行為,配合可選的重試功能,可輕松構建高可靠的網絡客戶端。
-
其
單線程
設計模型與TcpServer
形成完美互補,共同構成Muduo網絡庫的核心架構。
完結撒花~
test code: https://github.com/lvy010/Common-C-_Lib/tree/main/test