Rust入門之并發編程基礎(三)
題記:6月底7月初,結束北京的工作生活回到二線省會城市發展了,鴿了較久了,要繼續堅持學習Rust,堅持寫博客。
背景
我們平時使用計算機完成某項工作的時候,往往可以同時執行多個任務,比如可以編程的時候同時播放音樂,就算是單核CPU也是如此。這是因為現代計算機操作系統會使用**“中斷機制”**來執行任務,任務可以分為:
- “CPU 密集型“或者“計算密集型”
- ”IO 密集型“
根據這兩種類型,又可以有針對性的利用操作系統的”中斷機制“提供計算機同時執行多任務的效率。大多數函數調用都是會發生阻塞的,等待當前執行完成才會繼續執行后續的動作,如果在一個程序中有多個任務,多個任務中某些任務阻塞時可以不影響其他任務執行,也就是異步執行多個任務,這樣效率就會提高很多了。多線程異步執行程序就顯得尤為重要了。
討論一下并發和并行
比如給員工分配任務,如果分配給一個員工多個任務,這個員工在任務一個任務完成前同時處理多個任務,這就類似計算機操作系統的中斷機制,這就是并發。
當將多個任務在多個員工中分配執行,每個員工分配一個任務并單獨處理它,這就是并行。每個組員可以真正的同時進行工作。
如果一個任務執行必須依賴另一個任務,那么任務之間必須串行的執行,一個處理了再處理下一個任務。并行和并發也會發生相互交叉(阻塞)如果某個程序中的某幾個任務的并發執行都需要等待另一個任務的完成,可能就會集中時間做這個任務,那么就都無法并行工作了。
并發 VS 并行
特征 | 并發 | 并行 | 串行 |
---|---|---|---|
執行方式 | 任務交替執行 | 任務同時執行 | 順序執行 |
類比模型 | 單員工輪流處理多個任務 | 多員工各處理單個任務 | 單任務完成后才能開始下一個 |
資源需求 | 單核或多核 | 多核必需 | 單核或多核 |
執行時機 | 任務可能重疊 | 任務真正同時 | 任務順序執行 |
同樣的基礎動作也作用于軟件和硬件。在單核CPU的計算機上,CPU一次只能執行一個操作,不過它仍然是并發工作,借助像線程,進程和異步(async)等工具,計算機可以暫停一個活動,并在最終切換回第一個活動執行之前切換到其它活動。在一個多核CPU的計算機上,它也可以并行工作,一個核心執行一個任務,同時另一個核心可以執行其他不相關的工作,而且這些工作實際上是同時發生的。
Rust 異步編程主要處理并發性,取決于硬件、操作系統和所使用的異步運行時(async runtime),并發也有可能在底層使用了并行。下面將詳細的討論Rust 異步編程是如何工作的。
Rust 異步編程的核心組件:future、async、await
Rust 異步編程的三個重要關鍵元素:
- futures
async
關鍵字await
關鍵字
future 是一個現在還沒準備好,未來會返回結果的一個值。類似Java 語言中也有類似的Future概念。Rust提供了 Future trait
作為基礎組件。
async
關鍵字可以用于代碼塊或函數,表明它們可以被中斷或恢復。在一個async
塊或者 async
函數中,可以使用 await
關鍵字來等待一個 future
準備就緒,這個過程稱之為等待一個 future
。每一個等待future
的地方都可能是一個async
塊或async
函數中斷并隨后恢復的點。檢查一個future
并查看其值是否準備就緒的過程被稱之為輪詢(polling)。
future 的特點:
- Rust編譯器將
async/await
代碼轉換為使用Future trait
的等效代碼- 類似 for 循環被轉換為使用
Iterator trait
- 類似 for 循環被轉換為使用
- 開發者可以為自定義數據類型實現
Future trait
- 提供統一的接口但允許實現不同的異步操作實現
Rust 官方為了大家學習實驗異步操作,創建了一個 trpl
crate(trpl
是 “The Rust Programming Language” 的縮寫)。它重導出了你需要的所有類型、traits 和函數,它們主要來自于 futures
和 tokio
crates。
futures
crate 是一個 Rust 異步代碼實驗的官方倉庫,也正是Future
最初設計的地方。- Tokio 是目前 Rust 中應用最廣泛的異步運行時(async runtime),特別是(但不僅是!)web 應用。這里還有其他優秀的運行時,它們可能更適合你的需求。我們在
trpl
的底層使用 Tokio 是因為它經過了充分測試且廣泛使用。
接下來上代碼,利用 trpl
提供的多種組件來編寫第一個異步程序。我們構建了一個小的命令行工具來抓取兩個網頁,拉取各自的 <title>
元素,并打印出第一個完成全部過程的標題。先創建一個rust項目,添加 trpl
庫。
Cargo.toml:
[package]
name = "hello-async"
version = "0.1.0"
edition = "2021"[dependencies]
trpl = "0.2.0"
main.rs:
use trpl::Html;
use trpl::Either;async fn page_title(url: &str) -> (&str, Option<String>) {// 傳入的任意 URL,使用 await 等待響應,因為Rust的futures是惰性的,只有調用await時,才會執行異步操作let response = trpl::get(url).await;let response_text = response.text().await;let title = Html::parse(&response_text).select_first("title").map(|title_element| title_element.inner_html());(url, title)
}fn main() {// 接收參數,兩個參數分別是兩個URLlet args: Vec<String> = std::env::args().collect();trpl::run(async {let title_fut_1 = page_title(&args[1]);let title_fut_2 = page_title(&args[2]);let (url, maybe_title) =match trpl::race(title_fut_1, title_fut_2).await {Either::Left(left) => left,Either::Right(right) => right,};println!("{url} returned first");match maybe_title {Some(title) => println!("Its page title is: '{title}'"),None => println!("Its title could not be parsed."),}});
}
async
修飾 page_title 函數,說明這個函數是一個異步函數。trpl::get(url)
去調用url地址返回響應,這里需要等待時間,這個函數也是用 async
修飾了表示它也是一個異步函數并返回future,這里加上await
,表示我們要等待這個future 返回響應。同樣response.text()
也是異步的,這里也使用 await
等待返回結果。 響應文本拿到后再使用Html::parse
解析。
這里要注意因為Rust的futures是惰性的,只有調用await時,才會執行異步操作,然后這里也可以改為鏈式調用,讓代碼更加簡潔。
page_title
這個函數使用了async
修飾,當函數使用async
的時候,就會將函數轉換為返回Future
的普通函數。
這個示例分別由用戶提供的 URL 調用 page_title
開始。我們將調用 page_title
產生的 future 分別保存為 title_fut_1
和 title_fut_2
。請記住,它們還沒有進行任何工作,因為 future 是惰性的,并且我們還沒有 await
它們。接著我們將 futures 傳遞給 trpl::race
,它返回一個值表明哪個傳遞的 future 最先返回。
并發與async
使用異步編程解決一些并發問題,這里更多關注線程與future的區別。
代碼示例:
use std::time::Duration;fn main() {trpl::run(async {trpl::spawn_task(async {for i in 1..10 {println!("hi numnber {i} from the first task!");trpl::sleep(Duration::from_millis(500)).await;}});for i in 1..5 {println!("hi number {i} from the second task!");trpl::sleep(Duration::from_millis(500)).await; }});
}
執行結果:
hi number 1 from the second task!
hi numnber 1 from the first task!
hi number 2 from the second task!
hi numnber 2 from the first task!
hi number 3 from the second task!
hi numnber 3 from the first task!
hi number 4 from the second task!
hi numnber 4 from the first task!
hi numnber 5 from the first task!
根據執行結果可以看出。first task 在 second task 執行結束后也停止了,這是因為主任務(second task)已經停止,在主任務中創建的異步任務(first task)也會停止。如果要運行first task 直到結束,就需要一個join
(join handle)來等待第一個任務完成。對于線程來說,可以使用join
方法來阻塞直到線程結束運行。在這里可以使用await
達到相同的效果。
添加handle.await.unwrap():
use std::time::Duration;fn main() {trpl::run(async {let handle = trpl::spawn_task(async {for i in 1..10 {println!("hi numnber {i} from the first task!");trpl::sleep(Duration::from_millis(500)).await;}});for i in 1..5 {println!("hi number {i} from the second task!");trpl::sleep(Duration::from_millis(500)).await; }handle.await.unwrap();});
}
執行結果:
hi number 1 from the second task!
hi numnber 1 from the first task!
hi number 2 from the second task!
hi numnber 2 from the first task!
hi numnber 3 from the first task!
hi number 3 from the second task!
hi number 4 from the second task!
hi numnber 4 from the first task!
hi numnber 5 from the first task!
hi numnber 6 from the first task!
hi numnber 7 from the first task!
hi numnber 8 from the first task!
hi numnber 9 from the first task!
消息傳遞
再使用前面講過的消息傳遞的例子,這次使用 future
演示線程間消息傳遞,來看看基于 future
的并發和基于線程的并發的差異。
trpl
中的 rx.recv()
返回一個future
,是異步的。之前我們使用let s = rx.recv();
是同步阻塞的。
let s: Result<String, mpsc::RecvError> = rx.recv();
代碼示例:
fn main() {trpl::run(async {let (tx, mut rx) = trpl::channel();let val = String ::from("hi");tx.send(val).unwrap();// trpl channel rx.recv() 返回的是一個future, 是異步非阻塞版本let received = rx.recv().await.unwrap();println!("get: {received}");});}
上面的代碼中,發送到接收都是順序執行的也就是同步的,因為它們都在同一個async
代碼塊當中。接下來修改一下代碼,我們發送多個消息,讓多個消息異步發送和接收,而不是都發送完才可以接收。
將發送端和接收端分別放到各自的async
塊中,返回兩個future,再使用trpl::join(),返回一個新的future.,再調用await等待兩個future完成。
代碼示例:
use std::time::Duration;fn main() {trpl::run(async {let (tx, mut rx) = trpl::channel();// 發送放到一個future 中let tx_future = async move {let vals = vec![String::from("Hi"),String::from("from"),String::from("the"),String::from("future"),];for val in vals {tx.send(val).unwrap();trpl::sleep(Duration::from_millis(500)).await;}};let rx_future = async {while let Some(value) = rx.recv().await {println!("received: {value}");}};// 使用 join 接收兩個future,返回一個新的futuretrpl::join(tx_future, rx_future).await;});}
執行結果:
每隔500 ms 接收一個消息并打印。
received: Hi
received: from
received: the
received: future
let tx_future = async move {
這里使用了move 關鍵字,將 tx
移動(move)進異步代碼塊,它會在代碼塊結束后立刻被丟棄,這樣tx
銷毀了,rx
也就在接收后優雅的關閉。
多生產者代碼示例:
use std::time::Duration;fn main() {trpl::run(async {let (tx, mut rx) = trpl::channel();let tx1 = tx.clone();// 發送放到一個future 中let tx1_future = async move {let vals = vec![String::from("Hi"),String::from("from"),String::from("the"),String::from("future"),];for val in vals {tx1.send(val).unwrap();trpl::sleep(Duration::from_millis(500)).await;}};let rx_future = async {while let Some(value) = rx.recv().await {println!("received: {value}");}};let tx_future = async move {let vals = vec![String::from("Hi"),String::from("from"),String::from("the"),String::from("future"),];for val in vals {tx.send(val).unwrap();trpl::sleep(Duration::from_millis(500)).await;}};// 使用 join 接收兩個future,返回一個新的futuretrpl::join3(tx1_future, tx_future, rx_future).await;});}
后續
本文討論了并發和人并行的區別,也講了 future,await 再異步編程中的作用,future 代表未來會返回結果值的一個變量,await表示要等待future返回結果。
本文記錄根據Rust程序設計語言(Rust 中文社區翻譯)學習筆記,但是發現這個網頁版電子書,異步這里講的很抽象,后續經過更深入的學習會再更新異步編程的部分。