📚 C++ 中實現 Task::WhenAll
和 Task::WhenAny
的兩種方案
引用:
拈朵微笑的花
想一番人世變換
到頭來輸贏又何妨
日與夜互消長
富與貴難久長
今早的容顏老於昨晚
- C++ 標準庫異步編程示例(一)
- C++ TAP(基于任務的異步編程模式)
🚀 引言:異步編程的需求與挑戰
在現代軟件開發中,異步編程已成為提升應用性能的關鍵技術。C# 提供了優雅的 Task.WhenAll
和 Task.WhenAny
機制來管理多個異步任務,但 C++ 標準庫中缺乏直接等效功能。本文將深入探討兩種高效實現方案:
🔄 方案1:基于輪詢的簡單實現
🛠? when_all
實現(簡單輪詢)
#include <vector>
#include <future>template <typename T>
std::vector<T> when_all(std::vector<std::future<T>>& futures) {std::vector<T> results;for (auto& fut : futures) {results.push_back(fut.get());}return results;
}// void 特化版本
void when_all(std::vector<std::future<void>>& futures) {for (auto& fut : futures) {fut.get();}
}
原理說明
- 順序執行:循環遍歷每個 future,調用
get()
方法阻塞等待結果 - 結果收集:對于非 void 任務,結果存儲在 vector 中返回
- 優點:實現簡單,代碼直觀
- 缺點:順序等待導致性能瓶頸
🛠? when_any
實現(輪詢方式)
#include <chrono>
#include <vector>
#include <future>template <typename T>
size_t when_any(std::vector<std::future<T>>& futures) {while (true) {for (size_t i = 0; i < futures.size(); ++i) {if (futures[i].wait_for(std::chrono::seconds(0)) == std::future_status::ready) {return i;}}std::this_thread::sleep_for(std::chrono::milliseconds(10));}
}
原理說明
- 輪詢檢測:使用
wait_for(0)
非阻塞檢查任務狀態 - 指數退避:每次檢測后休眠減少 CPU 占用
- 返回索引:返回第一個完成任務的索引
- 缺點:CPU 占用高,響應延遲(最大10ms)
? 方案2:基于條件變量的高效實現
🧩 WhenAll
類設計(高效等待所有任務)
#include <vector>
#include <future>
#include <mutex>
#include <condition_variable>
#include <thread>class WhenAll {
public:void add_future(std::future<void> fut) {std::lock_guard<std::mutex> lock(mtx);futures.push_back(std::move(fut));count++;}void wait() {std::unique_lock<std::mutex> lock(mtx);if (count == 0) return;for (auto& fut : futures) {std::thread([&, this] {fut.wait();std::lock_guard<std::mutex> lock(mtx);if (--count == 0) cv.notify_all();}).detach();}cv.wait(lock, [this] { return count == 0; });}private:std::vector<std::future<void>> futures;std::mutex mtx;std::condition_variable cv;int count = 0;
};
架構解析
工作原理
- 添加任務:通過
add_future
添加異步任務 - 監控線程:為每個任務創建監控線程
- 條件等待:主線程等待條件變量
cv
- 完成通知:最后完成的任務觸發
notify_all()
- 資源釋放:監控線程自動分離(
detach
)
性能優勢
- 零輪詢:完全消除CPU空轉
- 即時響應:任務完成立即通知
- 線程安全:互斥鎖保護共享狀態
🧩 WhenAny
類設計(高效等待任意任務)
#include <vector>
#include <future>
#include <mutex>
#include <condition_variable>template <typename T>
class WhenAny {
public:template <typename Func>void add_task(Func func) {std::lock_guard<std::mutex> lock(mtx);futures.push_back(std::async(std::launch::async, [this, func] {auto result = func();{std::lock_guard<std::mutex> lock(mtx);if (!completed) {completed = true;completed_index = futures.size() - 1;cv.notify_all();}}return result;}));}size_t wait() {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [this] { return completed; });return completed_index;}std::future<T>& get_future(size_t index) {return futures[index];}private:std::vector<std::future<T>> futures;std::mutex mtx;std::condition_variable cv;bool completed = false;size_t completed_index = 0;
};
架構解析
工作流程
關鍵特性
- 泛型支持:模板化設計支持任意返回類型
- 一次性通知:
completed
標志確保只通知一次 - 結果獲取:
get_future
方法獲取已完成任務的結果 - 線程安全:互斥鎖保護共享狀態
🧪 使用示例與場景分析
基本使用示例
#include <iostream>
#include <chrono>
#include <thread>int main() {// 示例1: WhenAll 使用WhenAll wa;wa.add_future(std::async([] { std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "任務1完成\n";}));wa.add_future(std::async([] { std::this_thread::sleep_for(std::chrono::seconds(2));std::cout << "任務2完成\n"; }));std::cout << "等待所有任務...\n";wa.wait();std::cout << "所有任務完成!\n";// 示例2: WhenAny 使用WhenAny<int> wany;wany.add_task([] {std::this_thread::sleep_for(std::chrono::seconds(3));return 100;});wany.add_task([] {std::this_thread::sleep_for(std::chrono::seconds(1));return 200;});std::cout << "等待任意任務完成...\n";size_t index = wany.wait();auto& fut = wany.get_future(index);std::cout << "任務" << index << "最先完成,結果: " << fut.get() << "\n";return 0;
}
實際應用場景
- 微服務聚合:并行調用多個微服務,等待所有響應
- 競態條件處理:多個數據源查詢,使用第一個返回結果
- 資源加載:并行加載多個資源文件,等待全部完成
- 超時處理:多個備用服務調用,使用最先響應的服務
🚀 高級優化與擴展
性能優化技術
-
線程池集成:避免為每個任務創建新線程
void wait(ThreadPool& pool) {std::unique_lock<std::mutex> lock(mtx);if (count == 0) return;for (auto& fut : futures) {pool.enqueue([&, this] {fut.wait();std::lock_guard<std::mutex> lock(mtx);if (--count == 0) cv.notify_all();});}cv.wait(lock, [this] { return count == 0; }); }
-
共享future優化:避免多次get()調用
void add_future(std::shared_future<void> shared_fut) {std::lock_guard<std::mutex> lock(mtx);shared_futures.push_back(shared_fut);count++; }
-
批量任務添加:減少鎖競爭
void add_futures(const std::vector<std::future<void>>& new_futures) {std::lock_guard<std::mutex> lock(mtx);futures.insert(futures.end(), new_futures.begin(), new_futures.end());count += new_futures.size(); }
功能擴展
-
超時支持:添加
wait_for
和wait_until
template <typename Rep, typename Period> bool wait_for(const std::chrono::duration<Rep, Period>& timeout) {std::unique_lock<std::mutex> lock(mtx);return cv.wait_for(lock, timeout, [this] { return count == 0; }); }
-
混合類型支持:使用
std::variant
class WhenAnyVariant { public:template <typename Func>void add_task(Func func) {using ResultType = decltype(func());std::lock_guard<std::mutex> lock(mtx);futures.push_back(std::async(std::launch::async, [=] {auto result = func();{std::lock_guard<std::mutex> lock(mtx);if (!completed) {completed = true;completed_index = futures.size() - 1;cv.notify_all();}}return std::variant<ResultType>(std::move(result));}));}// ... 其他成員 private:std::vector<std::future<std::variant<int, double, std::string>>> futures; };
-
進度追蹤:添加進度回調
void wait(std::function<void(int)> progress_callback) {std::unique_lock<std::mutex> lock(mtx);if (count == 0) return;for (auto& fut : futures) {std::thread([&, this] {fut.wait();std::lock_guard<std::mutex> lock(mtx);progress_callback(++completed_count);if (completed_count == count) cv.notify_all();}).detach();}cv.wait(lock, [this] { return completed_count == count; }); }
?? 注意事項與最佳實踐
異常處理
void add_future(std::future<void> fut) {std::lock_guard<std::mutex> lock(mtx);futures.push_back(std::async(std::launch::async, [fut = std::move(fut)]() mutable {try {fut.get();} catch (const std::exception& e) {std::cerr << "任務異常: " << e.what() << std::endl;}}));count++;
}
資源管理最佳實踐
- 避免線程泄漏:使用RAII包裝線程
- 預防死鎖:鎖粒度最小化
- 內存安全:使用智能指針管理共享數據
- 性能監控:添加任務執行時間統計
平臺適配性
- 跨平臺考慮:使用標準庫確保可移植性
- 編譯器支持:確保C++17及以上特性
- 異步模型:與平臺特定API(如IOCP/epoll)集成
📊 性能對比分析
方案類型 | CPU占用 | 響應延遲 | 內存開銷 | 適用場景 |
---|---|---|---|---|
簡單輪詢 | 高 (持續10-100%) | 高 (10ms級) | 低 | 任務少、低頻率 |
條件變量 | 低 (<1%) | 低 (μs級) | 中 | 高并發、實時系統 |
線程池集成 | 中 (可控) | 低 (μs級) | 高 | 大規模任務處理 |
🔮 未來發展與C++標準展望
C++23/26異步特性
- std::execution:標準執行器支持
- 協程增強:更簡潔的異步代碼編寫
- 網絡庫集成:與標準網絡庫協同工作
社區解決方案
- Boost.Asio:提供
async_wait_all
等實用工具 - Folly庫:Facebook的高性能異步工具集
- Qt Concurrent:跨平臺異步框架
💎 總結
本文詳細探討了在C++中實現 Task.WhenAll
和 Task.WhenAny
的兩種核心方案:
- 簡單輪詢方案:適用于輕量級場景,實現簡單但效率較低
- 條件變量方案:高性能實現,適用于生產環境
- 零輪詢設計減少CPU占用
- 即時響應確保最佳性能
- 擴展性強,支持超時、進度回調等高級功能
最佳實踐建議:
- 小型工具類使用簡單輪詢方案
- 高性能服務器使用條件變量方案
- 大規模并行處理集成線程池
- 始終考慮異常安全和資源管理