清晰理解 Amdahl’s Law(阿姆達爾定律),這是一條描述并行計算加速能力的核心定律。
定義公式:
S = 1 ( 1 ? P ) + P N S = \frac{1}{(1 - P) + \frac{P}{N}} S=(1?P)+NP?1?
- S S S:加速比(Speedup)
- P P P:程序中可以并行的比例(0 ≤ P ≤ 1)
- N N N:使用的處理器數量
解讀:
假設:
- 程序分為兩部分:
- 串行部分(1 - P):不能并行,只能在單核上運行;
- 并行部分(P):可在多核上同時運行。
示例:
假設程序有:
- 90% 的代碼可以并行(P = 0.9)
- 使用 4 核處理器(N = 4)
帶入公式:
S = 1 ( 1 ? 0.9 ) + 0.9 4 = 1 0.1 + 0.225 = 1 0.325 ≈ 3.08 S = \frac{1}{(1 - 0.9) + \frac{0.9}{4}} = \frac{1}{0.1 + 0.225} = \frac{1}{0.325} \approx 3.08 S=(1?0.9)+40.9?1?=0.1+0.2251?=0.3251?≈3.08
? 即使并行部分很多,也無法得到理想的4倍加速,因為還有10%是串行的。
結論與限制:
- 當 N → ∞ N \to \infty N→∞,加速上限變為:
S max = 1 1 ? P S_{\text{max}} = \frac{1}{1 - P} Smax?=1?P1? - 舉例:如果 P = 0.95 P = 0.95 P=0.95,那么最多加速到 20 倍,不管你有多少核!
實際意義:
- **強擴展性(Strong Scaling)**分析的重要工具。
- 提醒我們:優化串行部分同樣重要。
- 說明“更多核” ≠ “更高效”,受限于串行瓶頸。
import matplotlib.pyplot as plt
import numpy as np
# Define a range of processor counts
N = np.linspace(1, 64, 200) # from 1 to 64 processors
# Define different values of P (parallelizable portion)
P_values = [0.5, 0.75, 0.9, 0.95, 0.99]
# Create plot
plt.figure(figsize=(10, 6))
for P in P_values:S = 1 / ((1 - P) + P / N)plt.plot(N, S, label=f'P = {P}')
plt.title("Amdahl's Law: Speedup vs Number of Processors")
plt.xlabel("Number of Processors (N)")
plt.ylabel("Speedup (S)")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()
這張圖展示了 Amdahl’s Law 的核心含義:隨著處理器數量增加,程序加速比的提升會逐漸趨于上限,而這個上限由程序中串行部分的比例決定。
圖中要點說明:
- 每條曲線代表不同的并行比例 P P P(越接近 1 表示程序越適合并行化)
- 橫軸:處理器數量 N N N(1 到 64)
- 縱軸:加速比 S S S
觀察分析:
并行比例 P P P | 最大加速極限 lim ? N → ∞ S \lim_{N \to \infty} S limN→∞?S |
---|---|
0.5 | 1 1 ? 0.5 = 2 \frac{1}{1 - 0.5} = 2 1?0.51?=2 |
0.75 | 1 0.25 = 4 \frac{1}{0.25} = 4 0.251?=4 |
0.90 | 1 0.10 = 10 \frac{1}{0.10} = 10 0.101?=10 |
0.95 | 1 0.05 = 20 \frac{1}{0.05} = 20 0.051?=20 |
0.99 | 1 0.01 = 100 \frac{1}{0.01} = 100 0.011?=100 |
Rule 1:盡可能地并行化程序(Parallelize as Much as Humanly Possible)
這是最基本的并行編程原則:
“盡你所能,并行化!”
原因在于,并行計算的性能收益很大程度上取決于你能有效地將工作拆分成可以同時執行的子任務(參見 Amdahl’s Law)。
四大災難(The 4 Horsemen of the Apocalypse)
1. Starvation(饑餓)
并發工作不足,導致資源利用率低。
- 比如你有 8 核 CPU,但只生成了 2 個任務去運行。
- 其余 6 個核處于閑置狀態 → 浪費計算資源。
** 解法:** 更細粒度的任務劃分、更好的負載均衡。
2. Latency(延遲)
因資源訪問距離遠導致的響應延遲。
- 比如數據在遠程服務器或其他 NUMA 節點上。
- 即使并行了,訪問遠程內存或 I/O 的延遲也會成為瓶頸。
** 解法:** 預取、局部性優化(locality optimization)、async I/O、future/promise。
3. Overheads(開銷)
管理并行任務/資源帶來的額外負擔。
- 比如線程創建/銷毀、任務調度、同步、內存分配等。
- 有時這些管理代碼在“關鍵路徑”上,可能比串行還慢。
** 解法:** 線程池、task-stealing、避免過度切分任務。
4. Contention(資源爭用)
多個線程/任務競爭同一資源導致的等待。
- 比如多個線程鎖住同一個隊列、文件、數據庫連接等。
- 這會讓任務阻塞,降低并行效率。
** 解法:** - 避免共享(no sharing is best sharing)
- 使用 lock-free 數據結構、分片(sharding)、讀寫鎖
總結口訣:
并行是加速的源泉,但四騎士是效率的噩夢。
現實中并行編程常見的一些深層次結構性問題。這些問題不僅僅是代碼寫得對不對,而是并行模型本身的局限性。我們來一一理解:
現實問題解析(Real-world Problems)
1. 并行性不足(Insufficient parallelism imposed by the programming model)
即使硬件足夠強大,編程模型本身可能強制你“等別人”,導致資源閑置:
- OpenMP:
#pragma omp parallel for
在每次并行循環后都有一個隱式 barrier(柵欄)→ 所有線程必須等到最慢的線程完成才能繼續。 - MPI:每一步模擬后,通常需要一個**全局同步通信(barrier)**來更新狀態。
本質問題:強同步點,阻礙了持續并行化。
2. 過度同步(Over-synchronization)
并不是算法要求同步這么多,而是編程模型(或粗暴實現)同步了太多不該同步的東西。
- 在 MPI 中,不同節點(rank)往往會因為邏輯寫法或者慣性思維,被迫走“鎖步”邏輯(step lock-step):
Rank 0 send → Rank 1 recv → all wait
結果:并發資源未被充分利用。最快的線程/節點被最慢的拖著走。
3. 缺乏協調(Insufficient coordination between on-node and off-node parallelism)
即使你寫了并行代碼,也常常只針對一部分層級(on-node / off-node):
- MPI 管遠程節點(off-node)
- OpenMP 管本地線程(on-node)
- 加速器用 CUDA / SYCL 管 GPU
如果你這三者之間缺乏統一協調和協同優化,就很難實現真正高效的混合并行(hybrid parallelism)。
4. 并行模型割裂(Distinct programming models for different types of parallelism)
這點尤為痛苦:
- 節點間通信:MPI
- 節點內線程:OpenMP / std::thread
- GPU 并行:CUDA / OpenCL / SYCL
問題在于:你必須使用完全不同的 API、語義和調試工具鏈,幾乎像寫三套程序一樣,非常容易出錯、難以維護、難以調優。
總結建議:
這些問題說明了傳統并行模型的局限,也解釋了為什么:
- 高性能計算(HPC)程序維護困難;
- 跨平臺 / 異構計算開發成本高;
- 新興的異步并行模型(如 HPX, Kokkos, RAJA, SYCL)正在興起。
如果你正在學習并行計算或實際工程中需要設計并行架構,可以優先考慮: - 減少強同步點
- 最小化共享狀態
- 使用任務并行而非僅僅線程并行
- 學習統一模型如 HPX、SYCL 或 C++ Parallelism TS
并行編程中一個非常重要的限制性模式:Fork/Join 并行模型,以及它在 C++17 中的標準并行算法中的影響。
什么是 Fork/Join 并行模型?
Fork/Join 是一種非常經典、簡單的并行執行模式,流程如下:
- Fork(分叉):將一個任務拆分成多個子任務,并行執行;
- Join(匯合):所有子任務執行完之后,主線程等待它們結束,然后繼續執行后續邏輯。
示例(偽代碼):
parallel_for(0, N, [](int i) {do_work(i);
});
// 等所有 do_work(i) 完成之后,才繼續后續代碼
為什么 Fork/Join 模型有問題?
1. 強制同步點(Join)影響性能
即使某些子任務已經完成,也不能繼續執行下一階段,必須等所有任務結束。
- 類似 OpenMP
#pragma omp parallel for
中的隱式 barrier。 - C++17
std::execution::par
也是類似模式。
2. 不支持異步流水線(pipelining)
Fork/Join 不支持“一個任務的結果立即傳遞給下一個”,缺少流式/事件驅動模型的靈活性。
C++17 中的情況:
雖然 C++17 增加了標準并行算法:
std::for_each(std::execution::par, vec.begin(), vec.end(), func);
但是這些都是 Fork/Join 模式!
- 算法會將
vec
拆分并發執行; - 一旦調用結束,意味著所有并發任務都執行完成了。
所以,它雖然并行了,但仍然是“同步完成”的。
與現代異步模型的對比(如 HPX):
- Fork/Join:結構簡單,但強同步。
- Asynchronous Task Graph(異步任務圖):如 HPX/TBB/async/
co_await
- 支持任務依賴、懶執行、延遲調度
- 沒有全局同步點,吞吐量更高,延遲更低
總結
模型 | 優點 | 缺點 |
---|---|---|
Fork/Join | 實現簡單,語義清晰 | 有全局同步點,性能瓶頸 |
異步任務圖 | 高吞吐,支持流水線 | 實現更復雜,調試更困難 |
如果你希望更深入了解“如何用現代 C++ 編寫異步/事件驅動的并行代碼”,我可以: |
- 演示
std::async
/std::future
- 講解
HPX
、unifex
、std::execution
提案 - 或者對比不同并行庫(OpenMP vs TBB vs HPX)
Rule 2:使用支持 SLOW 的編程環境
SLOW = Small, Light-weight Overhead Work
→ 也就是說,我們需要一個支持極小、極輕量任務的并行運行時。
思維實驗:Overheads vs. Grain Size
你提到的圖展示的是:
在不同粒度(每個線程的工作量)下,不同開銷(每個線程的開銷)對總執行時間的影響
- 小任務粒度 + 低開銷環境 → 最優擴展性
- 任務過多時(比如上千萬):
- 無法創建那么多系統線程(kernel threads)
- 調度和同步成本極高
- 難以編寫和維護
面臨的問題:
問題 | 說明 |
---|---|
無法使用那么多 pthread | 系統線程數量有限,內核調度開銷大 |
無法管理上千萬個任務 | 思維負擔大,bug 容易產生 |
需要任務調度抽象機制 | 必須引入運行時調度器,例如 HPX/TBB/task system |
解決方案:使用任務抽象系統
支持 SLOW 的并行環境應該提供:
- 輕量級任務調度(例如用戶級線程、fiber、coroutine)
- 任務竊取(work stealing)調度算法
- 惰性執行 / 延遲調度(減少無用調度)
- 自動粒度控制(例如 HPX 的
adaptive_chunk_size
)
推薦工具(支持 SLOW 的環境):
工具/庫 | 特性 |
---|---|
HPX | 真正異步、基于 futures、細粒度調度 |
Intel TBB | 自動任務粒度控制 + work stealing |
OpenMP Task | 基本任務調度,但缺乏高級抽象 |
std::async / std::execution | C++ 標準支持,仍偏粗粒度 |
總結:
- 傳統的線程 = 粗粒度、高開銷,不適合千萬級任務
- 現代異步并行編程必須支持大量小任務
- C++ 開發中,應選擇具有用戶級任務調度器的框架來支持高并發下的高效執行
C++ 中的 future
(未來值) 概念及其在異步編程中的作用。以下是詳細解釋和理解:
什么是 future?
future 是一個代表“尚未計算出結果”的對象。
換句話說,它是一個“占位符” —— 將來某個時候會有值,但現在你還不能訪問它。
#include <future>
#include <iostream>
int universal_answer() { return 42; }
void deep_thought() {std::future<int> promised_answer = std::async(&universal_answer);// ...做其它事情(比如思考750萬年)std::cout << promised_answer.get() << std::endl; // 會等待結果并輸出 42
}
特性總結
特性 | 說明 |
---|---|
透明地同步 | future.get() 會自動等待結果準備好(不需要顯式管理線程)。 |
隱藏線程細節 | 不需要自己啟動、管理或 join 線程,std::async 會幫你處理。 |
便于管理異步任務 | 可以把多個異步任務組合在一起。 |
使并發更易轉化為并行 | 因為你聲明了“我需要這個值”,系統可以選擇在另一個線程中并行計算它。 |
圖示邏輯(解釋幻燈片中圖):
Locality 1(主線程/消費者)
- 創建
future
并觸發async
- 如果你調用
.get()
但結果未就緒,消費者線程會掛起等待 - 等到有結果時被喚醒繼續執行
Locality 2(工作線程/生產者) - 在后臺線程計算結果
- 完成后把結果交給
future
總結
std::future
是 C++ 異步編程的基礎設施之一,能有效:
- 異步啟動任務(通過
std::async
等) - 延遲或等待結果(通過
.get()
) - 降低并發編程難度(無需直接操作線程)
你可以把它當成“一張票” —— 將來憑票可領取某個值,現在可以先做別的事。
遞歸并行(Recursive Parallelism)的代碼示例。
代碼示例說明:
T tree_node::traverse() {if (has_children()) {std::array<T, 8> results; // 假設每個節點有8個子節點for (int i = 0; i != 8; ++i)results[i] = children[i].traverse(); // 遞歸遍歷每個子節點return combine_results(results, compute_result());}return compute_result(); // 如果是葉子節點,直接計算結果并返回
}
- 遞歸遍歷樹結構:
該函數是對樹的遞歸遍歷。每個節點調用自己的traverse()
方法,來處理子節點的計算。 - 判斷是否有子節點:
has_children()
判斷當前節點是否有子節點。如果有,進入遞歸過程。 - 對子節點遞歸調用:
用一個std::array
存放8個子節點的結果,遍歷每個子節點并遞歸調用traverse()
。 - 葉子節點處理:
如果當前節點沒有子節點(葉子節點),就直接調用compute_result()
計算并返回該節點的結果。 - 合并結果:
當所有子節點都遞歸完成后,將子節點的結果數組results
與當前節點自身的計算結果compute_result()
合并,得到當前節點的最終結果。
遞歸并行(Recursive Parallelism)的核心思想:
- 代碼示例里,子節點的遞歸調用寫成了順序執行,但實際上可以把對8個子節點的遞歸調用并行執行,也就是說:
- 8個
children[i].traverse()
調用可以同時啟動,互不阻塞。 - 等待所有子節點的結果都完成后,再合并計算。
- 8個
- 這種方式充分利用多核或分布式系統的并行計算能力,加速樹結構的遍歷和計算。
總結:
- 遞歸遍歷樹:從根節點開始,對每個節點調用
traverse()
。 - 葉子節點直接計算:沒有子節點的節點直接計算結果。
- 中間節點合并:有子節點時,先遞歸獲取所有子節點的結果,然后合并這些結果并加上自己計算的結果。
- 遞歸并行:可以讓子節點遞歸調用并行執行,提升性能。
這段使用異步 future
實現遞歸樹遍歷的代碼添加詳細注釋。
T traverse(tree_node const &t)
{if (t.has_children()) {// 用future數組保存8個子節點的異步任務結果std::array<std::future<T>, 8> results;// 對8個子節點,分別異步調用traverse遞歸遍歷for (int i = 0; i != 8; ++i)results[i] = std::async(traverse, t.children[i]);// 計算當前節點自身的結果T r = t.compute_result();// 等待所有異步任務完成,確保子節點結果已準備好wait_all(results);// 把所有子節點結果和當前節點結果合并后返回return t.combine_results(results, r);}// 葉子節點,直接計算并返回結果return t.compute_result();
}
代碼逐步理解
- 判斷是否有子節點
if (t.has_children())
判斷當前節點是否是內部節點(有子節點)。 - 異步啟動子節點遞歸調用
- 使用
std::future<T>
數組保存異步任務。 std::async(traverse, t.children[i])
異步調用traverse
函數遍歷第i
個子節點,立即返回future
。- 這樣8個子節點的遍歷操作幾乎同時開始,形成真正的并行。
- 使用
- 計算當前節點自身結果
在等待子節點完成前,先計算當前節點自身的結果,利用計算資源。 - 等待所有子節點任務完成
wait_all(results);
等待所有future
對象完成,確保子節點的計算結果可用。 - 合并結果返回
把所有子節點的結果results
和當前節點自身的結果r
一起傳給combine_results()
,得到當前節點的最終結果。 - 葉子節點直接計算
如果沒有子節點,直接計算當前節點結果并返回。
總結:
- 這段代碼充分利用
std::async
實現遞歸并行,每個節點的子節點遞歸遍歷可以并發執行。 - 利用異步任務,避免阻塞,提升樹遍歷效率。
- 代碼清晰表達了遞歸并行模型:先異步發起子任務,再計算自身,最后等待合并。
這個代碼片段是在實現遞歸樹遍歷的異步版本,用到了一些更高級的異步組合技術,比如 when_all
和 then
,它比之前的 wait_all
更加符合現代異步編程模型,支持鏈式調用和更靈活的結果處理。
代碼解釋與理解
future<T> traverse(tree_node const &t)
{if (t.has_children()) {// 用future數組保存8個子節點的異步遍歷任務std::array<future<T>, 8> results;// 異步遞歸調用遍歷每個子節點for (int i = 0; i != 8; ++i)results[i] = async(traverse, t.children[i]);// 這里不是阻塞等待,而是把子節點的異步結果和當前節點計算結果組合起來// 先把所有子節點的future合并成一個future(當所有子節點完成時觸發)// 再把當前節點的計算結果 t.compute_result() 也作為參數傳入return when_all(results, t.compute_result()).then(// 當所有子節點完成并拿到當前節點的結果后,調用combine_results進行合并[](auto f, auto r) { return combine_results(f, r); });}// 葉子節點,直接返回計算結果(future封裝的結果)return t.compute_result();
}
關鍵點解析:
- 返回值變成
future<T>
表示這個函數是異步的,調用它不會立即得到結果,而是返回一個future
,未來某個時間點可以得到計算結果。 async(traverse, t.children[i])
異步啟動子節點的遞歸遍歷,返回對應的future<T>
。when_all(results, t.compute_result())
- 這是一個組合函數,等待所有
results
中的future
完成(即所有子節點的遍歷結束),同時也把當前節點的compute_result()
作為參數一并傳入。 - 返回一個新的
future
,它會在所有子任務和當前節點計算都完成時被觸發。
- 這是一個組合函數,等待所有
.then(...)
- 這是一個回調,
when_all
返回的future
完成后會調用這個函數。 - 參數
(auto f, auto r)
分別代表子節點的結果集合和當前節點的計算結果。 - 調用
combine_results(f, r)
來合并所有結果。
- 這是一個回調,
- 葉子節點
如果沒有子節點,直接返回當前節點計算的結果,這里應該返回的是future<T>
,也就是說compute_result()
本身返回的是future<T>
,或者會被自動包裝成future<T>
。
這樣做的好處:
- 純異步,非阻塞,調用者可以繼續做其他事情,等待最終結果。
- 使用了任務組合(
when_all
)和回調鏈式操作(then
),代碼簡潔且符合現代異步編程范式。 - 充分利用多核并行能力,樹的每個子樹都并行計算。
- 避免了顯式的等待(
wait_all
),更靈活。
這段代碼用到了 C++20 協程(coroutine) 的 co_await
和 co_return
,實現了遞歸樹的異步遍歷
future<T> traverse(tree_node const &t)
{if (t.has_children()) {std::array<future<T>, 8> results; // 8個子節點的future數組for (int i = 0; i != 8; ++i)results[i] = async(traverse, t.children[i]); // 異步遞歸遍歷每個子節點// co_await等待所有子節點結果,co_await等待當前節點計算結果,然后combineco_return t.combine_results(co_await results, co_await t.compute_result());}// 葉子節點,直接co_return當前節點計算結果co_return t.compute_result();
}
逐行解析和理解
- 函數返回類型是
future<T>
,且用到了協程關鍵字co_await
和co_return
,表明這是一個協程函數,支持異步等待。 if (t.has_children()) {
判斷當前節點是否有子節點。std::array<future<T>, 8> results;
創建一個長度為8的future
數組,用于存放8個子節點的異步計算結果。for (int i = 0; i != 8; ++i) results[i] = async(traverse, t.children[i]);
異步啟動8個子節點的traverse
調用,每個返回一個future<T>
。co_await results
這里假設可以對std::array<future<T>, 8>
做co_await
,它會等待所有子節點的異步任務完成,并得到所有結果。co_await t.compute_result()
等待當前節點自身的異步計算結果。t.combine_results(...)
把子節點們的結果和當前節點的計算結果合并。co_return
返回最終結果。- 葉子節點(沒有子節點)直接調用
co_return t.compute_result();
返回當前節點結果。
核心理解:
- 遞歸調用是異步的,每個子節點都通過
async
異步啟動traverse
,所有子任務同時進行,充分利用并行計算能力。 co_await
用來異步等待這些子任務完成,無需阻塞線程,提升效率。co_return
用來返回協程最終結果,包裝為future<T>
。- 代碼邏輯清晰,簡潔且符合現代 C++ 協程編程風格。
額外說明:
- 注意:標準 C++ 并沒有直接支持對
std::array<future<T>, 8>
使用co_await
,通常需要實現一個能等待所有子future
完成的輔助函數,比如類似when_all
的協程適配版本。 t.compute_result()
也必須返回future<T>
,或者能被co_await
,即異步計算。- 這個寫法非常適合于異步任務圖形化展開,適合樹形結構的高效異步遍歷。
如果需要,我可以幫你寫一個co_await
支持等待std::array<future<T>, 8>
的輔助函數示例,或者一個完整的、可運行的異步遞歸遍歷協程示例。你覺得怎么樣?
這段代碼加上詳細注釋,并解釋代碼的整體流程和協程機制的工作原理:
#include <coroutine>
#include <exception>
#include <iostream>
// task<T> 是一個協程返回類型,封裝了協程句柄和協程狀態
template <typename T>
struct task {struct promise_type; // promise_type 必須聲明,用于協程框架using handle_type = std::coroutine_handle<promise_type>;// promise_type 定義了協程的生命周期和狀態管理struct promise_type {T value_; // 用來存儲協程返回值std::exception_ptr exception_; // 用來存儲異常// 返回協程句柄封裝的 task 對象task get_return_object() { return task{handle_type::from_promise(*this)};}// 協程啟動時不掛起,立即運行協程體std::suspend_never initial_suspend() { return {}; }// 協程結束時掛起(掛起等待外部顯式銷毀),保證協程狀態有效std::suspend_always final_suspend() noexcept { return {}; }// 如果協程中拋出異常,會捕獲并保存到 exception_void unhandled_exception() { exception_ = std::current_exception(); }// 協程通過 co_return 返回值時調用,保存結果void return_value(T v) { value_ = v; }};handle_type coro_; // 協程句柄,指向協程狀態// 構造函數:接受協程句柄explicit task(handle_type h) : coro_(h) {}// 析構時銷毀協程,釋放資源~task() {if (coro_) coro_.destroy();}// awaiter 接口:await_ready 返回 true 表示協程結果已經準備好,不掛起bool await_ready() noexcept { return true; }// await_suspend 不會被調用,因為 await_ready 已經返回 truevoid await_suspend(std::coroutine_handle<>) noexcept {}// await_resume 返回協程結果或重新拋出異常T await_resume() {if (coro_.promise().exception_) std::rethrow_exception(coro_.promise().exception_);return coro_.promise().value_;}
};
// 一個模擬異步計算的協程,直接返回傳入的值
task<int> compute_result(int v) { co_return v;
}
// 遞歸協程函數,計算深度值的和
task<int> traverse(int depth) {if (depth == 0) {// 葉子節點,直接等待計算結果(這里直接返回 depth)int res = co_await compute_result(depth);co_return res;} else {// 非葉子節點,遞歸計算左右子樹int left = co_await traverse(depth - 1);int right = co_await traverse(depth - 1);// 計算當前節點的值int self = co_await compute_result(depth);// 返回所有結果的和co_return left + right + self;}
}
int main() {auto t = traverse(3); // 調用遞歸協程// 直接同步等待并獲取結果,輸出std::cout << "Result: " << t.await_resume() << "\n";
}
depth=3/ \depth=2 depth=2/ \ / \depth=1 depth=1 depth=1 depth=1/ \ / \ / \ / \
d=0 d=0 d=0 d=0 d=0 d=0 d=0 d=0
代碼執行流程和協程機制解釋
- 調用
traverse(3)
:- 生成一個協程狀態(棧幀和相關數據結構),返回一個
task<int>
對象,包含協程句柄。 - 因為
initial_suspend()
返回std::suspend_never
,協程立即開始執行。
- 生成一個協程狀態(棧幀和相關數據結構),返回一個
- 遞歸執行:
- 如果
depth != 0
,會遞歸調用traverse(depth-1)
兩次,分別生成兩個子協程。 - 每個子協程也是立即執行,直到
depth==0
,葉子節點調用compute_result(0)
。
- 如果
compute_result
協程:- 只是一個簡單的協程,直接用
co_return v;
返回輸入值。 co_await compute_result(...)
表示等待異步計算完成,因await_ready()
返回true
,不會掛起,立即返回結果。
- 只是一個簡單的協程,直接用
- 協程間的等待(
co_await
):- 因為
await_ready()
返回true
,所以每次co_await
都是同步執行。 - 子協程完成后會將結果返回給調用方。
- 因為
- 返回和結果累積:
- 每層
traverse
調用將子樹計算結果相加,再加上當前節點的計算結果,co_return
返回總和。
- 每層
main()
里的同步獲取結果:- 調用
await_resume()
,直接獲得最終結果(深度為3的樹的所有節點值和)。 - 打印輸出。
- 調用
- 協程銷毀:
- 因為
final_suspend()
返回std::suspend_always
,協程結束時會掛起,保持狀態有效。 task
對象析構時調用coro_.destroy()
釋放協程資源。
- 因為
重點理解點
- 協程句柄 (
std::coroutine_handle
) 是對協程狀態的引用,負責啟動、掛起、恢復和銷毀協程。 - promise_type 定義了協程的生命周期控制、返回值保存、異常處理等。
initial_suspend()
和final_suspend()
控制協程開始和結束時是否掛起。co_await
通過 awaiter 接口決定是否掛起協程,何時恢復,如何獲取結果。- 這段代碼實現了一個同步遞歸異步執行的示例,異步用協程機制實現,實際不會并行,因為所有
co_await
立即返回結果。
如果你想實現真正的異步并行遞歸,還需要在await_ready()
返回false
并實現異步調度邏輯(比如事件循環、線程池調度等)。這段代碼主要演示了協程語法和遞歸調用的配合,邏輯清晰,容易理解。
1. 異步通道(Asynchronous Channels)是什么?
- 高層通信抽象
異步通道封裝了通信的操作細節,是通信的一種高級抽象。它讓程序員不用關注底層的細節,而專注于數據的發送和接收。 - 特別適合異步邊界交換
在并行計算中,不同計算節點之間常常需要交換數據。異步通道特別適合這種跨計算單元、異步的通信場景。 - 仿照 Go 語言的通道(Go-channels)設計
Go 語言的通道是一種很流行的并發編程模型,允許不同協程之間通過通道異步傳遞數據。這里借鑒了這種設計理念。
2. 跨“locality”通信
- “Locality”是什么?
這里的 locality 指的是計算資源的局部范圍,比如一個節點、一個核、或者一個進程。 - 通道可以在一個 locality 創建,在另一個 locality 訪問
這意味著可以在不同計算節點間通過通道傳遞消息,實現分布式通信。 - 類似點對點雙向通信(P2P communicators)
這和 MPI(消息傳遞接口)中的點對點通信概念類似,但這里更抽象,更異步。
3. 異步特性
- channel::get() 和 channel::set() 返回 futures
這兩種操作分別是從通道讀取和向通道寫入數據。它們返回 future,意味著調用者可以在等待數據準備好或發送完成期間繼續執行其他任務,真正實現了異步非阻塞操作。
總結
異步通道是一個高級的異步通信機制,靈感來源于 Go 語言通道,允許在分布式或并行環境中以異步方式交換數據。它將通信操作封裝成返回 futures 的異步接口,方便并行任務高效協作。
future的二維模板(2D stencil)計算的主循環和單步操作。下面我幫你拆解理解關鍵點。
1. Futurized 2D Stencil: 主循環 (Main Loop)
hpx::future<void> step_future = make_ready_future();
for (std::size_t t= 0; t != steps; ++t)
{step_future = step_future.then([t](hpx::future<void> &&){return perform_one_time_step(t);});
}
step_future.get(); // 等待所有步驟完成
make_ready_future()
創建一個已經完成的 future,作為初始依賴。- 每次循環中,調用
step_future.then(...)
,將上一次步驟的完成作為下一步執行的前提。 - 這樣構造一個異步的執行鏈,每一步執行完后,才執行下一步。
- 最后用
step_future.get()
阻塞等待所有步驟結束。
2. 單步執行:更新邊界(Update Boundaries)
hpx::future<void> partition::perform_one_time_step(int t)
{// 從鄰居處異步獲取上邊界數據hpx::future<void> top_boundary_future= channel_up_from.get(t).then([](hpx::future<std::vector<double>> && up_future){std::vector<double> data = up_future.get(); // 不阻塞,直接獲取結果// 處理“幽靈區”(ghost-zone)數據// 向鄰居發送更新后的幽靈區數據return channel_up_to.set(data);});// 下面還會處理底部、左右邊界以及內部區域(見下一節)
}
- 通過異步通道
channel_up_from.get(t)
從上鄰居獲取數據,得到一個 future。 - 用
.then()
注冊回調,回調中處理數據并異步發送給上鄰居。 - “幽靈區”是并行計算中鄰居分區邊界的冗余數據,用于保證計算正確。
- 該過程全部異步執行,不阻塞主線程。
3. 單步執行:內部計算(Interior)
hpx::future<void> interior_future =hpx::parallel::for_loop(par(task), min+1, max-1, [](int idx){// 對每個內部點應用 stencil 操作});
- 使用 HPX 的并行循環
parallel::for_loop
并行地對內部(非邊界)點執行 stencil 計算。 - 該操作本身返回一個 future,表示并行計算的完成。
4. 單步執行:匯合(Wrap-up)
return when_all(top_boundary_future, bottom_boundary_future,left_boundary_future, right_boundary_future,interior_future);
- 將所有邊界異步操作和內部計算的 futures 合并成一個 future,只有當所有操作完成時該 future 才完成。
- 這樣保證一步計算的所有部分都完成。
5. Futurization 技術簡介
- Futurization 是一種自動將同步代碼轉換成異步“未來式(futurized)”代碼的技術。
- 通過延遲實際執行,避免不必要的同步阻塞。
- 它將原本順序執行的代碼轉成一個代表算法依賴關系的執行樹。
- 執行該執行樹會獲得與原始代碼相同的結果,但執行速度會更快(因為最大限度并行)。
6. Futurization 的示意對比
直寫代碼 (Straight Code) | Futurized 代碼 (Futurized Code) |
---|---|
T func() { ... } | future<T> func() { ... } |
T n = func(); | future<T> n = func(); |
future<T> n = async(&func, ...); | |
future<future<T>> n = async(&func, ...); * |
- Futurized 代碼中,返回類型變為
future<T>
,表明結果是異步獲得的。 - 直接執行代碼轉變為構建異步任務和依賴的過程。
- 嵌套的
future<future<T>>
會自動展開成future<T>
,方便使用。
總結
- 主循環中,利用 future.then() 構建依賴鏈,串行執行多個時間步的異步計算。
- 每個時間步,分成異步邊界通信和并行內部計算,并用 when_all() 合并完成狀態。
- Futurization 技術幫助從直寫同步代碼無縫轉到高效異步執行,利用并行計算和通信最大化性能。