1. Streams:異步數據流
1.1 Streams 與 Iterator
的異同
Rust 的 Iterator
是同步的,通過 next()
方法逐個獲取數據。而 Stream
是 async
版本的 Iterator
,它使用 next().await
來獲取數據項。
示例:將 Iterator
轉換為 Stream
use trpl::{stream_from_iter, StreamExt};let numbers = vec![1, 2, 3, 4, 5];
let stream = stream_from_iter(numbers.into_iter());while let Some(value) = stream.next().await {println!("Received: {}", value);
}
此示例中:
stream_from_iter()
將Iterator
轉換為Stream
。- 通過
stream.next().await
按順序異步獲取數據項。
2. 組合 Streams
2.1 構建 Stream
處理異步消息
在實際應用中,我們經常需要從網絡、數據庫或消息隊列中接收數據。這時,可以用 trpl::channel
創建 Stream
來異步處理數據。
use trpl::{channel, ReceiverStream};fn get_messages() -> impl Stream<Item = String> {let (tx, rx) = channel();spawn_task(async move {for letter in "abcdefghij".chars() {tx.send(letter.to_string()).await.unwrap();}});ReceiverStream::new(rx)
}while let Some(msg) = get_messages().next().await {println!("Message: {}", msg);
}
get_messages
返回一個Stream
,每次next().await
便能獲取新的數據項。- 通過
spawn_task
啟動異步任務,定期向Stream
發送數據。
3. 控制 Stream
速率與超時
3.1 timeout
:為 Stream
設置超時
當處理外部數據時,我們可能希望對每個 Stream
數據項設定超時時間,以避免某個數據源長時間無響應。
use trpl::{StreamExt, sleep, Duration};let messages = get_messages().timeout(Duration::from_millis(200));while let Some(result) = messages.next().await {match result {Ok(msg) => println!("Message: {}", msg),Err(_) => println!("Timeout occurred!"),}
}
timeout()
方法為Stream
每個數據項設置超時時間。- 當數據在 200ms 內到達時,正常輸出,否則觸發超時邏輯。
3.2 throttle
:限制 Stream
處理速率
有時,我們希望 Stream
以固定的速率生成數據,而不是盡可能快地處理。
use trpl::StreamExt;let throttled_messages = get_messages().throttle(Duration::from_millis(100));
throttle()
方法限制Stream
處理頻率,每 100ms 處理一個數據項。- 避免
Stream
過快地填充下游處理邏輯。
4. 合并多個 Streams
4.1 merge
:合并多個 Stream
在某些情況下,我們可能有多個 Stream
數據源,例如:
- 一個 Stream 處理用戶輸入
- 一個 Stream 處理傳感器數據
可以使用 merge()
將它們合并到一個 Stream
,以便統一處理:
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals().map(|i| format!("Interval: {}", i));let merged = messages.merge(intervals);while let Some(event) = merged.next().await {println!("Received: {}", event);
}
messages
處理異步消息,帶 200ms 超時。intervals
生成時間間隔數據(Interval: 1
,Interval: 2
, …)。merge()
方法合并兩個Stream
,同時接收消息和時間間隔。
4.2 take
:限制 Stream
處理的項數
有時,我們希望 Stream
只處理有限數量的數據項。例如,限制為 10 條:
let limited_stream = merged.take(10);
這樣,merged
只會輸出 10 條數據,然后 Stream
自動結束。
5. 處理 Stream
可能的錯誤
在異步系統中,消息通道的 send
操作可能會失敗,例如 tx.send(msg).await.unwrap();
。
如果通道關閉,send
會返回 Err
。因此,我們應當合理地處理這些錯誤,而不是 unwrap()
。
if let Err(e) = tx.send(msg).await {println!("Error sending message: {:?}", e);break;
}
在真實應用中,應當根據錯誤類型采取適當的恢復策略,而不是直接 break
退出。
6. 總結
Stream
適用于異步數據流,類似Iterator
,但支持await
。timeout
可為Stream
每個數據項設置超時時間。throttle
限制Stream
生成數據的速率。merge
將多個Stream
合并,便于處理多個數據源。take
限制Stream
處理的最大數據項數。- 合理處理
send
失敗,避免異步任務意外崩潰。
🚀 適用場景:
- 處理 WebSocket、Kafka、數據庫監聽 等 流式數據。
- 限流
API
調用,避免發送太多請求。 - 處理用戶事件流,如 鍵盤輸入、鼠標點擊。
通過 Stream
及其擴展方法,我們可以輕松構建高效的異步數據處理系統。Rust 提供了強大的 async
生態,讓我們能更輕松地編寫安全、高性能的并發代碼!