await
Suspend execution until the result of a Future is ready.
暫停執行,直到一個 Future 的結果就緒。
.awaiting
a future will suspend the current function’s execution until the executor has run the future to completion.
對一個 Future 使用 .await
操作會暫停當前函數的執行,直到執行器(executor)將該 Future 運行至完成
Read the async book for details on how async/await and executors work.
有關異步 / 等待(async/await)和執行器的工作原理的詳細信息,請閱讀《異步編程指南》(Async Book)。
Editions
await is a keyword from the 2018 edition onwards.
await 是從 2018 版及后續版本開始引入的關鍵字。
It is available for use in stable Rust from version 1.39 onwards.
從 1.39 版本及以后的穩定版 Rust 中可以使用它。
AsyncReadExt
use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;#[tokio::main]
async fn main() -> io::Result<()> {let mut f = File::open("foo.txt").await?;let mut buffer = Vec::new();// 讀取整個文件的內容f.read_to_end(&mut buffer).await?;// String::from_utf8 是 String 類型的一個關聯函數// 專門用于把 Vec<u8> 類型的字節向量轉換為 String 類型的 UTF - 8 字符串// 它會檢查字節向量中的字節序列是否符合 UTF - 8 編碼規則// 如果符合則返回一個 Ok(String),若不符合則返回 Err(FromUtf8Error)// 適用于從字節數據(如文件讀取、網絡接收等)構建字符串,并且需要確保數據是有效的 UTF - 8 編碼match String::from_utf8(buffer) {Ok(content) => {println!("文件內容如下:\n{}", content);}Err(e) => {eprintln!("將文件內容轉換為字符串時出錯: {}", e);}}Ok(())
}
use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;#[tokio::main]
async fn main() -> io::Result<()> {let mut f = File::open("foo.txt").await?;let mut buffer = Vec::new();// 讀取整個文件的內容f.read_to_end(&mut buffer).await?;Ok(())
}
為什么說 字節數組 &[u8]
實現了 AsyncRead ?
字節數組切片 &[u8]
實現了 AsyncRead 特征,這意味著它可以作為異步讀取操作的數據源,允許以異步的方式從字節數組中讀取數據
AsyncRead 特征的作用
AsyncRead 是 tokio 異步運行時庫中定義的一個特征,它定義了異步讀取操作的接口。其核心方法是 poll_read,該方法用于嘗試從數據源中異步讀取數據到指定的緩沖區。通過實現 AsyncRead 特征,類型可以參與到異步 I/O 操作中,利用異步運行時的調度機制,在等待數據可讀時讓出控制權,提高程序的并發性能。
AsyncRead 特征的簡化定義如下:
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::Result;pub trait AsyncRead {fn poll_read(self: Pin<&mut Self>,cx: &mut Context<'_>,buf: &mut [u8],) -> Poll<Result<usize>>;
}
self 是實現該特征的類型的可變引用,使用 Pin 確保在異步操作過程中對象的內存位置不會改變
cx 是任務上下文,包含了任務的喚醒器等信息,用于在數據準備好時喚醒任務
buf 用于存儲讀取數據的緩沖區
Poll 枚舉表示操作的結果,可能是 Poll::Ready
表示操作已完成,返回實際讀取的字節數;也可能是 Poll::Pending
表示操作還未完成,需要等待。
&[u8] 實現 AsyncRead 的原因
靈活性和通用性:字節數組切片 &[u8]
是一種非常常見且靈活的數據表示方式,它可以表示內存中的一段連續字節數據。實現 AsyncRead 特征后,&[u8]
可以作為異步讀取操作的數據源,使得很多使用 AsyncRead 的代碼可以直接處理字節數組,無需額外的轉換。
例如,在測試代碼中,可以使用字節數組模擬文件或網絡數據進行異步讀取測試。
異步編程的一致性:在異步編程中,希望不同的數據源(如文件、網絡套接字、內存緩沖區等)都能以統一的方式進行異步讀取操作。通過讓 &[u8]
實現 AsyncRead 特征,保持了異步讀取操作的一致性,使得代碼更加簡潔和易于維護。
示例代碼
use tokio::io::{self, AsyncReadExt};#[tokio::main]
async fn main() -> io::Result<()> {// 定義一個字節數組,存儲了字符串 "Hello, World!" 的 UTF - 8 編碼字節數據let data = b"Hello, World!";let mut buffer = [0; 5];// 創建一個字節數組切片 &[u8],作為異步讀取的數據源let mut reader = &data[..];// 調用 read 方法(該方法是基于 AsyncRead 特征實現的),異步地從字節數組切片中讀取數據到 buffer 中// await 關鍵字用于等待讀取操作完成,最終返回實際讀取的字節數let n = reader.read(&mut buffer).await?;println!("Read {} bytes: {:?}", n, &buffer[..n]);Ok(())
}
實際應用場景
測試:在編寫異步 I/O 代碼的單元測試時,可以使用字節數組模擬不同的數據源,方便進行測試。例如,測試一個異步數據解析器時,可以使用字節數組提供測試數據。
內存數據處理:當需要對內存中的字節數據進行異步處理時,如對加密數據、壓縮數據等進行異步解密或解壓縮操作,可以直接使用字節數組作為數據源。
字節數組切片 &[u8]
實現 AsyncRead 特征,為異步編程提供了更多的靈活性和一致性,使得字節數組可以方便地作為異步讀取操作的數據源。
AsyncWriteExt
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;#[tokio::main]
async fn main() -> io::Result<()> {let mut file = File::create("foo.txt").await?;// 將一個 &str 字符串轉變成一個字節數組:&[u8;10]// 然后 write 方法又會將這個 &[u8;10] 的數組類型隱式強轉為數組切片: &[u8]let n = file.write(b"some bytes").await?;println!("Wrote the first {} bytes of 'some bytes'.", n);Ok(())
}
AsyncWriteExt::write_all
將緩沖區的內容全部寫入到寫入器中
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;#[tokio::main]
async fn main() -> io::Result<()> {let mut file = File::create("foo.txt").await?;file.write_all(b"some bytes").await?;Ok(())
}
tokio::io處理標準的輸入/輸出/錯誤
use tokio::fs::File;
use tokio::io;#[tokio::main]
async fn main() -> io::Result<()> {// &[u8] 是字節數組切片let mut reader: &[u8] = b"hello";let mut file = File::create("foo.txt").await?;// 異步的將讀取器( reader )中的內容拷貝到寫入器( writer )中// 字節數組 &[u8] 實現了 AsyncRead,所以這里可直接使用 readerio::copy(&mut reader, &mut file).await?;Ok(())
}
tokio::io分離讀寫器
錯誤寫法
io::copy(&mut socket, &mut socket).await
讀取器和寫入器都是同一個 socket,因此需要對其進行兩次可變借用,這明顯違背了 Rust 的借用規則
用同一個 socket 是不行的,為了實現目標功能,必須將 socket 分離成一個讀取器和寫入器
任何一個讀寫器( reader + writer )都可以使用 io::split
方法進行分離,最終返回一個讀取器和寫入器,兩者可以獨自使用,例如可以放入不同的任務中。
回聲服務端
use tokio::io;
use tokio::net::TcpListener;#[tokio::main]
async fn main() -> io::Result<()> {let listener = TcpListener::bind("127.0.0.1:6142").await?;loop {let (mut socket, _) = listener.accept().await?;tokio::spawn(async move {// split 操作和 io::copy 調用是在同一個異步任務上下文中執行的// 由于它們處于同一個任務中,所以不存在不同任務之間的數據傳遞開銷和同步問題// 任務的執行是連貫的,避免了因為跨任務操作而引入的額外復雜性和性能損耗let (mut rd, mut wr) = socket.split();if io::copy(&mut rd, &mut wr).await.is_err() {eprintln!("failed to copy");}});}
}
回聲客戶端
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;#[tokio::main]
async fn main() -> io::Result<()> {let socket = TcpStream::connect("127.0.0.1:6142").await?;let (mut rd, mut wr) = io::split(socket);// 創建異步任務,在后臺寫入數據tokio::spawn(async move {wr.write_all(b"hello\r\n").await?;wr.write_all(b"world\r\n").await?;// 有時,我們需要給予 Rust 一些類型暗示,它才能正確的推導出類型Ok::<_, io::Error>(())});let mut buf = vec![0; 128];loop {let n = rd.read(&mut buf).await?;if n == 0 {break;}println!("GOT {:?}", &buf[..n]);}Ok(())
}
tokio::spawn
是 Tokio 運行時提供的一個函數,用于創建一個新的異步任務并將其放入任務隊列中等待執行。這個新任務會在 Tokio 運行時的調度下異步執行,與當前代碼所在的任務是并發關系,而不是順序執行關系。
當執行到 tokio::spawn
時,它會立即將傳入的異步閉包包裝成一個新的異步任務并放入 Tokio 運行時的任務隊列中,然后代碼會繼續往下執行,不會等待這個新任務開始執行。
因此,let mut buf = vec![0; 128];
這行代碼會緊接著 tokio::spawn
之后執行,而 tokio::spawn
內部的異步任務會在 Tokio 運行時調度到它時才開始執行。
Tokio 運行時的調度是基于事件驅動和任務優先級的,它會根據系統資源和任務的狀態動態地決定哪個任務先執行。所以,tokio::spawn
內部的任務可能在 let mut buf = vec![0; 128];
之前執行,也可能在之后執行,甚至可能與后續代碼并發執行
假設 tokio::spawn
內部的任務執行需要一些時間(例如網絡延遲),而創建 buf 向量的操作很快,那么很可能 let mut buf = vec![0; 128];
會先執行,然后才輪到 tokio::spawn
內部的任務開始執行
C-S修正
cargo run --bin server.rs
cargo run --bin client.rs
上述代碼跑起來之后,服務端不退出的話,客戶端會一直卡住,客戶端加如下函數即可解決
wr.shutdown().await ? ;
手動實現io
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;#[tokio::main]
async fn main() -> io::Result<()> {let listener = TcpListener::bind("127.0.0.1:6142").await?;loop {let (mut socket, _) = listener.accept().await?;tokio::spawn(async move {// 此處的緩沖區是一個 Vec 動態數組,它的數據是存儲在堆上,而不是棧上// 若改成 let mut buf = [0; 1024];,則存儲在棧上// 一個數據如果想在 .await 調用過程中存在,那它必須存儲在當前任務內// buf 會在 .await 調用過程中被使用,因此它必須要存儲在任務內let mut buf = vec![0; 1024];loop {match socket.read(&mut buf).await {// 返回值 `Ok(0)` 說明對端已經關閉Ok(0) => return,Ok(n) => {// Copy the data back to socket// 將數據拷貝回 socket 中if socket.write_all(&buf[..n]).await.is_err() {// 非預期錯誤,由于我們這里無需再做什么,因此直接停止處理return;}}Err(_) => {// 非預期錯誤,由于我們無需再做什么,因此直接停止處理return;}}}});}
}
若該緩沖區數組創建在棧上,那每條連接所對應的任務的內部數據結構看上去可能如下所示
struct Task {task: enum {AwaitingRead {socket: TcpStream,buf: [BufferType],},AwaitingWriteAll {socket: TcpStream,buf: [BufferType],}}
}
棧數組要被使用,就必須存儲在相應的結構體內,其中兩個結構體分別持有了不同的棧數組 [BufferType]
這種方式會導致任務結構變得很大
一般選擇緩沖區長度往往會使用分頁長度(page size),因此使用棧數組會導致任務的內存大小變得很奇怪甚至糟糕:
$page-size + 一些額外的字節
編譯器會進一步優化 async 語句塊的布局,而不是像上面一樣簡單的使用 enum
在實踐中,變量也不會在枚舉成員間移動。但是再怎么優化,任務的結構體至少也會跟其中的棧數組一樣大
因此通常情況下,使用堆上的緩沖區會高效實用的多
當任務因為調度在線程間移動時,存儲在棧上的數據需要進行保存和恢復,過大的棧上變量會帶來不小的數據拷貝開銷
因此,存儲大量數據的變量最好放到堆上
處理 EOF
當 TCP 連接的讀取端關閉后,再調用 read 方法會返回 Ok(0)
。此時,再繼續下去已經沒有意義,因此需要退出循環。
忘記在 EOF 時退出讀取循環,是網絡編程中一個常見的 bug :
loop {match socket.read(&mut buf).await {Ok(0) => return,// ... 其余錯誤處理}
}
一旦讀取端關閉后,那后面的 read 調用就會立即返回 Ok(0)
,而不會阻塞等待,因此這種無阻塞循環會最終導致 CPU 立刻跑到 100%
,并將一直持續下去,直到程序關閉。
小節
實際上,io::split
可以用于任何同時實現了 AsyncRead 和 AsyncWrite 的值,它的內部使用了 Arc 和 Mutex 來實現相應的功能。如果大家覺得這種實現有些重,可以使用 Tokio 提供的 TcpStream,它提供了兩種方式進行分離:
TcpStream::split
會獲取字節流的引用,然后將其分離成一個讀取器和寫入器。但由于使用了引用的方式,它們倆必須和 split 在同一個任務中。 優點就是,這種實現沒有性能開銷,因為無需 Arc 和 Mutex。
TcpStream::into_split
還提供了一種分離實現,分離出來的結果可以在任務間移動,內部是通過 Arc 實現