Rust Async 并發編程:處理任意數量的 Future 與 Stream

1. Streams:異步數據流

1.1 Streams 與 Iterator 的異同

Rust 的 Iterator 是同步的,通過 next() 方法逐個獲取數據。而 Streamasync 版本的 Iterator,它使用 next().await 來獲取數據項。

示例:將 Iterator 轉換為 Stream

use trpl::{stream_from_iter, StreamExt};let numbers = vec![1, 2, 3, 4, 5];
let stream = stream_from_iter(numbers.into_iter());while let Some(value) = stream.next().await {println!("Received: {}", value);
}

此示例中:

  • stream_from_iter()Iterator 轉換為 Stream
  • 通過 stream.next().await 按順序異步獲取數據項。

2. 組合 Streams

2.1 構建 Stream 處理異步消息

在實際應用中,我們經常需要從網絡、數據庫或消息隊列中接收數據。這時,可以用 trpl::channel 創建 Stream 來異步處理數據。

use trpl::{channel, ReceiverStream};fn get_messages() -> impl Stream<Item = String> {let (tx, rx) = channel();spawn_task(async move {for letter in "abcdefghij".chars() {tx.send(letter.to_string()).await.unwrap();}});ReceiverStream::new(rx)
}while let Some(msg) = get_messages().next().await {println!("Message: {}", msg);
}
  • get_messages 返回一個 Stream,每次 next().await 便能獲取新的數據項。
  • 通過 spawn_task 啟動異步任務,定期向 Stream 發送數據。

3. 控制 Stream 速率與超時

3.1 timeout:為 Stream 設置超時

當處理外部數據時,我們可能希望對每個 Stream 數據項設定超時時間,以避免某個數據源長時間無響應。

use trpl::{StreamExt, sleep, Duration};let messages = get_messages().timeout(Duration::from_millis(200));while let Some(result) = messages.next().await {match result {Ok(msg) => println!("Message: {}", msg),Err(_) => println!("Timeout occurred!"),}
}
  • timeout() 方法為 Stream 每個數據項設置超時時間。
  • 當數據在 200ms 內到達時,正常輸出,否則觸發超時邏輯。

3.2 throttle:限制 Stream 處理速率

有時,我們希望 Stream 以固定的速率生成數據,而不是盡可能快地處理。

use trpl::StreamExt;let throttled_messages = get_messages().throttle(Duration::from_millis(100));
  • throttle() 方法限制 Stream 處理頻率,每 100ms 處理一個數據項。
  • 避免 Stream 過快地填充下游處理邏輯。

4. 合并多個 Streams

4.1 merge:合并多個 Stream

在某些情況下,我們可能有多個 Stream 數據源,例如:

  • 一個 Stream 處理用戶輸入
  • 一個 Stream 處理傳感器數據

可以使用 merge() 將它們合并到一個 Stream,以便統一處理:

let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals().map(|i| format!("Interval: {}", i));let merged = messages.merge(intervals);while let Some(event) = merged.next().await {println!("Received: {}", event);
}
  • messages 處理異步消息,帶 200ms 超時。
  • intervals 生成時間間隔數據(Interval: 1, Interval: 2, …)。
  • merge() 方法合并兩個 Stream,同時接收消息和時間間隔。

4.2 take:限制 Stream 處理的項數

有時,我們希望 Stream 只處理有限數量的數據項。例如,限制為 10 條:

let limited_stream = merged.take(10);

這樣,merged 只會輸出 10 條數據,然后 Stream 自動結束。

5. 處理 Stream 可能的錯誤

在異步系統中,消息通道的 send 操作可能會失敗,例如 tx.send(msg).await.unwrap();

如果通道關閉,send 會返回 Err。因此,我們應當合理地處理這些錯誤,而不是 unwrap()

if let Err(e) = tx.send(msg).await {println!("Error sending message: {:?}", e);break;
}

在真實應用中,應當根據錯誤類型采取適當的恢復策略,而不是直接 break 退出。


6. 總結

  • Stream 適用于異步數據流,類似 Iterator,但支持 await
  • timeout 可為 Stream 每個數據項設置超時時間。
  • throttle 限制 Stream 生成數據的速率。
  • merge 將多個 Stream 合并,便于處理多個數據源。
  • take 限制 Stream 處理的最大數據項數。
  • 合理處理 send 失敗,避免異步任務意外崩潰。

🚀 適用場景:

  • 處理 WebSocketKafka數據庫監聽流式數據
  • 限流 API 調用,避免發送太多請求。
  • 處理用戶事件流,如 鍵盤輸入、鼠標點擊

通過 Stream 及其擴展方法,我們可以輕松構建高效的異步數據處理系統。Rust 提供了強大的 async 生態,讓我們能更輕松地編寫安全、高性能的并發代碼!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/70990.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/70990.shtml
英文地址,請注明出處:http://en.pswp.cn/web/70990.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

藍橋杯 路徑之謎

路徑之謎 題目描述 小明冒充 XX 星球的騎士&#xff0c;進入了一個奇怪的城堡。 城堡里邊什么都沒有&#xff0c;只有方形石頭鋪成的地面。 假設城堡地面是 nnnn 個方格。如下圖所示。 按習俗&#xff0c;騎士要從西北角走到東南角。可以橫向或縱向移動&#xff0c;但不能斜著走…

3-5 WPS JS宏 工作表的移動與復制學習筆記

************************************************************************************************************** 點擊進入 -我要自學網-國內領先的專業視頻教程學習網站 *******************************************************************************************…

聊聊Java的SPI機制

個人自建博客地址 什么是SPI呢&#xff1f; SPI全稱Service Provider Interface&#xff0c;翻譯過來就是服務提供者接口。調用方提供接口聲明&#xff0c;服務提供方對接口進行實現&#xff0c;提供服務的一種機制&#xff0c;服務提供方往往是第三方或者是外部擴展。 下面…

langchain4j+local-ai小試牛刀

序 本文主要研究一下如何本地運行local-ai并通過langchain4j集成調用。 步驟 curl安裝 curl https://localai.io/install.sh | sh% Total % Received % Xferd Average Speed Time Time Time CurrentDload Upload Total Spent Left Speed 100 21509 …

什么是“零日漏洞”(Zero-Day Vulnerability)?為何這類攻擊被視為高風險威脅?

正文 零日漏洞&#xff08;Zero-Day Vulnerability&#xff09; 是指軟件、硬件或系統中存在的、尚未被開發者發現或修復的安全漏洞。攻擊者在開發者意識到漏洞存在之前&#xff08;即“零日”內&#xff09;利用該漏洞發起攻擊&#xff0c;因此得名。這類漏洞的“零日”特性使…

鴻蒙 ArkUI 實現 2048 小游戲

2048 是一款經典的益智游戲&#xff0c;玩家通過滑動屏幕合并相同數字的方塊&#xff0c;最終目標是合成數字 2048。本文基于鴻蒙 ArkUI 框架&#xff0c;詳細解析其實現過程&#xff0c;幫助開發者理解如何利用聲明式 UI 和狀態管理構建此類游戲。 一、核心數據結構與狀態管理…

Milvus高性能向量數據庫與大模型結合

Milvus | 高性能向量數據庫&#xff0c;為規模而構建Milvus 是一個為 GenAI 應用構建的開源向量數據庫。使用 pip 安裝&#xff0c;執行高速搜索&#xff0c;并擴展到數十億個向量。https://milvus.io/zh Milvus 是什么&#xff1f; Milvus 是一種高性能、高擴展性的向量數據…

kettle插件-自定義函數-數據脫敏

平常我們在使用kettle抽取數據的時候會涉及到敏感數據邀請脫敏或者進行掩碼的需求&#xff0c;今天我們使用自定義函數插件來實現這些需求。 1、將自定義函數插件&#xff08;kettle-func-plugin.zip&#xff09;解壓后放到kettle的plugins目錄下面&#xff0c;然后重啟服務。…

LeetCode 每日一題 2025/2/24-2025/3/2

記錄了初步解題思路 以及本地實現代碼&#xff1b;并不一定為最優 也希望大家能一起探討 一起進步 目錄 2/24 1656. 設計有序流2/25 2502. 設計內存分配器2/26 1472. 設計瀏覽器歷史記錄2/27 2296. 設計一個文本編輯器2/28 2353. 設計食物評分系統3/1 131. 分割回文串3/2 2/24 …

C++動態與靜態轉換區別詳解

文章目錄 前言一、 類型檢查的時機二、安全性三、適用場景四、代碼示例對比總結 前言 在 C 中&#xff0c;dynamic_cast 和 static_cast 是兩種不同的類型轉換操作符&#xff0c;主要區別體現在類型檢查的時機、安全性和適用場景上。以下是它們的核心區別&#xff1a; 一、 類…

探秘《矩陣之美》:解鎖矩陣的無限魅力

在這個數據驅動的時代&#xff0c;矩陣作為數學中的瑰寶&#xff0c;不僅在理論研究中占據核心地位&#xff0c;更在工程技術、計算機科學、物理學、經濟學等眾多領域發揮著不可替代的作用。今天&#xff0c;讓我們通過中科院大學耿修瑞老師&#xff08;中科院空天信息研究院研…

【MySQL】(2) 庫的操作

SQL 關鍵字&#xff0c;大小寫不敏感。 一、查詢數據庫 show databases; 注意加分號&#xff0c;才算一句結束。 二、創建數據庫 {} 表示必選項&#xff0c;[] 表示可選項&#xff0c;| 表示任選其一。 示例&#xff1a;建議加上 if not exists 選項。 三、字符集編碼和排序…

Vue3實現文件上傳、下載及預覽全流程詳解(含完整接口調用)

文章目錄 一、環境準備1.1 創建Vue3項目1.2 安裝依賴1.3 配置Element Plus 二、文件上傳實現2.1 基礎上傳組件2.2 自定義上傳邏輯&#xff08;Axios實現&#xff09; 三、文件下載實現3.1 直接下載&#xff08;已知文件URL&#xff09;3.2 后端接口下載&#xff08;二進制流&am…

分布式數據存儲:提升系統彈性與性能的技術之路

分布式數據存儲:提升系統彈性與性能的技術之路 在當今數據爆炸式增長的時代,傳統的單機存儲系統已無法滿足大規模、高并發、低延遲的需求。尤其是在大數據、云計算和物聯網的推動下,數據存儲面臨著前所未有的挑戰。分布式數據存儲應運而生,通過將數據分布在多個物理節點上…

在編譯Linux的內核鏡像和模塊時,必須先編譯內核鏡像,再編譯模塊,順序不可隨意調整的原因

問&#xff1a;在編譯Linux的內核鏡像和模塊時,必須先編譯內核鏡像,再編譯模塊,順序不可隨意調整 答&#xff1a;在編譯 Linux 內核和模塊時&#xff0c;必須先編譯內核鏡像&#xff0c;再編譯模塊&#xff0c;順序不可隨意調整。 原因&#xff1a; 模塊依賴內核的頭文件和符…

免費使用 DeepSeek API 教程及資源匯總

免費使用 DeepSeek API 教程及資源匯總 一、DeepSeek API 資源匯總1.1 火山引擎1.2 百度千帆1.3 阿里百煉1.4 騰訊云 二、其他平臺2.1 華為云2.2 硅基流動 三、總結 DeepSeek-R1 作為 2025 年初發布的推理大模型&#xff0c;憑借其卓越的邏輯推理能力和成本優勢&#xff0c;迅速…

千峰React:案例二

完成對html文檔還有css的引入&#xff0c;引入一下數據&#xff1a; import { func } from prop-types import ./購物車樣式.css import axios from axios import { useImmer } from use-immer import { useEffect } from reactfunction Item() {return (<li classNameacti…

用DeepSeek生成批量刪除處理 PDF第一頁工具

安裝依賴庫 在運行程序之前&#xff0c;請確保安裝所需的庫&#xff1a; pip install pymupdf python-docx Python 程序代碼 import os import fitz # PyMuPDF from docx import Documentdef delete_pdf_first_page(input_path, output_path):"""刪除 PDF…

redis的下載和安裝詳解

一、下載redis安裝包 進入redis官網查看當前穩定版本&#xff1a; https://redis.io/download/發現此時的穩定版本是6.2.4&#xff0c; 此時可以去這個網站下載6.2.4穩定版本的tar包。 暫時不考慮不在windows上使用redis&#xff0c;那樣將無法發揮redis的性能 二、上傳tar…

如何使用 Jenkins 實現 CI/CD 流水線:從零開始搭建自動化部署流程

如何使用 Jenkins 實現 CI/CD 流水線:從零開始搭建自動化部署流程 在軟件開發過程中,持續集成(CI)和持續交付(CD)已經成為現代開發和運維的標準實踐。隨著代碼的迭代越來越頻繁,傳統的手動部署方式不僅低效,而且容易出錯。為了提高開發效率和代碼質量,Jenkins作為一款…