?? 歡迎大家來到景天科技苑??
🎈🎈 養成好習慣,先贊后看哦~🎈🎈
🏆 作者簡介:景天科技苑
🏆《頭銜》:大廠架構師,華為云開發者社區專家博主,阿里云開發者社區專家博主,CSDN全棧領域優質創作者,掘金優秀博主,51CTO博客專家等。
🏆《博客》:Rust開發,Python全棧,Golang開發,云原生開發,PyQt5和Tkinter桌面開發,小程序開發,人工智能,js逆向,App逆向,網絡系統安全,數據分析,Django,fastapi,flask等框架,云原生K8S,linux,shell腳本等實操經驗,網站搭建,數據庫等分享。所屬的專欄:Rust高性能并發編程
景天的主頁:景天科技苑
文章目錄
- Rust線程池
- rayon 線程池
- 一、Rayon核心API詳解
- 1.1 常用方法
- 1.2 常見并行組合子(Combinators)
- 1.3 自定義并行任務
- 二、Rayon線程池ThreadPoolBuilder
- 1. new() 方法
- 2. num_threads() 方法
- 3. thread_name() 方法
- 4. build() 方法
- 5. build_global 方法
- 6. 其他方法
- 7. 它還提供了一些回調函數的設置
- 9. 注意事項
Rust線程池
線程池是一種并發編程的設計模式,它由一組預先創建的線程組成,用于執行多個任務。
線程池的主要作用是在任務到達時,重用已創建的線程,避免頻繁地創建和銷毀線程,從而提高系統的性能和資源利用率。
線程池通常用于需要處理大量短期任務或并發請求的應用程序。
線程池的優勢包括:
? 減少線程創建和銷毀的開銷:線程的創建和銷毀是一項昂貴的操作,線程池通過重用線程減少了這些開銷,提高了系統的響應速度和效率。
? 控制并發度:線程池可以限制同時執行的線程數量,從而有效控制系統的并發度,避免資源耗盡和過度競爭。
? 任務調度和負載均衡:線程池使用任務隊列和調度算法來管理和分配任務,確保任務按照合理的方式分配給可用的線程,實現負載均衡和最優的資源利用。
rayon 線程池
Rayon 是 Rust 中的一個并行計算庫,它可以讓你更容易地編寫并行代碼,以充分利用多核處理器。
Rayon 提供了一種簡單的 API,允許你將迭代操作并行化,從而加速處理大規模數據集的能力。
除了這些核心功能外,它還提供構建線程池的能力。
rayon::ThreadPoolBuilder 是 Rayon 庫中的一個結構體,用于自定義和配置 Rayon線程池的行為。
線程池是 Rayon 的核心部分,它管理并行任務的執行。
通過使用ThreadPoolBuilder,你可以根據你的需求定制 Rayon 線程池的行為,以便更好地適應你的并行計算任務。
在創建線程池之后,你可以使用 Rayon 提供的方法來并行執行任務,利用多核處理器的性能優勢。
一、Rayon核心API詳解
Rayon最核心的API是并行迭代器(ParallelIterator),其中包含豐富的方法集。
1.1 常用方法
par_iter():創建并行迭代器(只讀)。
par_iter_mut():創建可變并行迭代器。
into_par_iter():消耗數據源,創建并行迭代器。
示例:
use rayon::prelude::*;fn main() {let mut nums = vec![10, 20, 30, 40];// 并行修改元素值nums.par_iter_mut().for_each(|x| *x += 1);println!("{:?}", nums);
}
不能直接在 par_iter 中嵌套 par_iter,否則會阻塞或 panic。使用獨立線程池可以避免嵌套并行死鎖。
1.2 常見并行組合子(Combinators)
map():并行映射。
filter():并行過濾。
reduce():并行歸約,合并結果。
fold():類似reduce,但支持初始狀態和結果合并。
find_any() / find_first():并行查找元素。
示例(并行篩選與轉換):
use rayon::prelude::*;fn main() {let nums = (0..1_000_000).collect::<Vec<_>>();// 并行篩選出偶數并求平方let squares: Vec<_> = nums.par_iter().filter(|&&x| x % 2 == 0).map(|&x| x * x).collect();println!("篩選后元素個數:{}", squares.len());
}
這段代碼,報錯就是要計算的數據超過i32類型的最大值導致的
我們在創建squares的時候,類型Vec<_>,編譯器會默認為i32,計算的數據很大,迭代0到1000000,然后計算偶數的平方,超過i32最大值,導致報錯
🚩 解決方案:
創建squares時,指定更大的數據類型:
use rayon::prelude::*;fn main() {let nums = (0..1_000_000).collect::<Vec<_>>();// 并行篩選出偶數并求平方//將squares指定更大的數據類型let squares: Vec<u128> = nums.par_iter().filter(|&&x| x % 2 == 0).map(|&x| x * x).collect();println!("篩選后元素個數:{}", squares.len());
}
1.3 自定義并行任務
Rayon提供了更底層的接口,讓你可以手動并行執行任務。
1.3.1 join方法
執行兩個并行任務,等待任務完成后繼續執行。
use rayon::join;fn fib(n: usize) -> usize {if n < 2 {return n;}let (a, b) = join(|| fib(n - 1),|| fib(n - 2));a + b
}fn main() {let result = fib(20);println!("斐波那契數:{}", result);
}
1.3.2 scope方法
并行執行多個互相獨立的任務,生命周期靈活控制。
use rayon::scope;fn main() {let mut a = 0;let mut b = 0;scope(|s| {s.spawn(|_| {a = expensive_compute_a();});s.spawn(|_| {b = expensive_compute_b();});});println!("結果:{}, {}", a, b);
}fn expensive_compute_a() -> i32 {100
}
fn expensive_compute_b() -> i32 {200
}
二、Rayon線程池ThreadPoolBuilder
rayon::ThreadPoolBuilder 是 Rayon 庫中用于 自定義線程池配置 的結構體,適用于對并發行為有更精細控制需求的場景。
Rayon 默認使用一個全局線程池(rayon::spawn、par_iter 默認使用它),但在某些情況下,我們希望:
控制線程池線程數量;
設置線程名、線程棧大小;
使用多個獨立的線程池隔離并發任務;
嵌套 Rayon 調用時避免死鎖(多線程池互不干擾);
這時,就需要用到 ThreadPoolBuilder。
ThreadPoolBuilder 是以設計模式中的構建者模式設計的, 以下是一些ThreadPoolBuilder 的主要方法:
1. new() 方法
創建一個新的 ThreadPoolBuilder 實例
use rayon::ThreadPoolBuilder;
fn main() {let builder = ThreadPoolBuilder::new();
}
2. num_threads() 方法
設置線程池的線程數量。
你可以通過這個方法指定線程池中的線程數,以控制并行度。
默認情況下,Rayon 會根據 CPU 內核數量自動設置線程數。
use rayon::ThreadPoolBuilder;
fn main() {let builder = ThreadPoolBuilder::new().num_threads(4); //設置線程池中的線程數量為4
}
3. thread_name() 方法
為線程池中的線程設置一個名稱,這可以幫助你在調試時更容易識別線程。
use rayon::ThreadPoolBuilder;
fn main() {let builder = ThreadPoolBuilder::new().thread_name(|i| format!("worker-{}", i));
}
查看每次執行的線程名
use rayon::prelude::*;
use rayon::ThreadPoolBuilder;
use std::thread;fn main() {let pool = ThreadPoolBuilder::new().num_threads(4).thread_name(|i| format!("my-pool-{}", i)).build().unwrap();pool.install(|| {let v: Vec<_> = (0..1000).into_par_iter().map(|x| {// 獲取當前線程名let thread_thread = thread::current();let thread_name = thread_thread.name().unwrap_or("unknown");println!("元素 {} 由線程 {} 處理", x, thread_name);x * x}).collect();println!("結果: {:?}", &v[..10]);});
}
4. build() 方法
通過 build 方法來創建線程池。
這個方法會將之前的配置應用于線程池并返回一個 rayon::ThreadPool 實例。
use rayon::ThreadPoolBuilder;
fn main() {let pool = ThreadPoolBuilder::new().num_threads(4).thread_name(|i| format!("worker-{}", i)).build().unwrap(); // 使用unwrap()來處理潛在的錯誤
}
5. build_global 方法
通過 build_global 方法創建一個全局的線程池
不推薦你主動調用這個方法初始化全局的線程池,使用默認的配置就好,記得全局的線程池只會初始化一次。多次調用會 panic
fn main() {rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap();
}
6. 其他方法
ThreadPoolBuilder 還提供了其他一些方法,用于配置線程池的行為,
如 stack_size() 用于設置線程棧的大小。
設置每個線程的棧大小(單位:字節):
let builder = rayon::ThreadPoolBuilder::new().stack_size(8 * 1024 * 1024); // 8MB
適用于遞歸深度大、調用棧復雜的程序。
7. 它還提供了一些回調函數的設置
start_handler() 用于設置線程啟動時的回調函數等。
線程啟動時調用的回調函數,可以用于初始化日志、TLS 等:
let builder = rayon::ThreadPoolBuilder::new().start_handler(|idx| {println!("線程 {} 啟動", idx);});
spawn_handler 實現定制化的函數來產生線程。
panic_handler 提供對panic 處理的回調函數。
exit_handler 提供線程退出時的回調。
let builder = rayon::ThreadPoolBuilder::new().exit_handler(|idx| {println!("線程 {} 退出", idx);});
下面這個例子演示了使用 rayon 線程池計算斐波那契數列:
//使用ranyon線程池計算斐波那契數列
fn fib(n: u128) -> u128 {if n == 0 {return 0;}if n == 1 {return 1;}let (a, b) = rayon::join(|| fib(n - 1),|| fib(n - 2));a + b
}fn rayon_threadpool() {let pool = rayon::ThreadPoolBuilder::new().num_threads(10).build().unwrap();pool.install(|| {let result = fib(20);println!("result = {}", result);});
}fn main() {rayon_threadpool();
}
? rayon::ThreadPoolBuilder 用來創建一個線程池。設置使用 10 個線程
? pool.install() 在線程池中運行 fib
? rayon::join 用于并行執行兩個函數并等待它們的結果。它使得你可以同時執行兩個獨立的任務,然后等待它們都完成,以便將它們的結果合并到一起。
通過在 join 中傳入 fib 遞歸任務, 實現并行計算 fib 數列
與直接 spawn thread 相比, 使用 rayon 的線程池有以下優點:
? 線程可重用, 避免頻繁創建/銷毀線程的開銷
? 線程數可配置, 一般根據 CPU 核心數設置
? 避免大量線程造成資源競爭問題
9. 注意事項
build_global() 只能調用一次;多次調用會 panic;
自定義線程池的 .install() 不能跨線程池嵌套調用 .par_iter();
不建議將阻塞操作(如IO)放入線程池中執行,Rayon主要用于CPU密集型任務;
一旦線程池創建完成,線程數不可更改,需重新構建。