并發編程
并發編程允許程序同時執行多個獨立的任務,充分利用現代多核處理器的性能。Rust 提供了強大的并發原語,同時通過類型系統和所有權規則在編譯時防止數據競爭和其他常見的并發錯誤。在本章中,我們將探索 Rust 的并發編程模型。
線程基礎
創建線程
Rust 標準庫提供了 std::thread
模塊,用于創建和管理線程:
use std::thread;
use std::time::Duration;fn main() {// 創建一個新線程let handle = thread::spawn(|| {for i in 1..10 {println!("在新線程中: {}", i);thread::sleep(Duration::from_millis(1));}});// 主線程中的代碼for i in 1..5 {println!("在主線程中: {}", i);thread::sleep(Duration::from_millis(1));}// 等待新線程完成handle.join().unwrap();
}
thread::spawn
函數接受一個閉包,該閉包包含要在新線程中執行的代碼,并返回一個 JoinHandle
。調用 join
方法會阻塞當前線程,直到新線程完成。
線程與所有權
當我們將閉包傳遞給 thread::spawn
時,Rust 需要知道閉包將在哪個線程中運行以及它將使用哪些數據。閉包默認會捕獲其環境中的變量,但在線程間傳遞數據時,我們需要考慮所有權問題。
use std::thread;fn main() {let v = vec![1, 2, 3];// 錯誤:Rust 無法確定 v 的生命周期// let handle = thread::spawn(|| {// println!("這是向量: {:?}", v);// });// 使用 move 關鍵字轉移所有權let handle = thread::spawn(move || {println!("這是向量: {:?}", v);});// 錯誤:v 的所有權已經轉移到新線程// println!("向量: {:?}", v);handle.join().unwrap();
}
使用 move
關鍵字可以強制閉包獲取其使用的值的所有權,而不是借用它們。這對于確保數據在線程運行期間有效非常重要。
消息傳遞
一種處理并發的流行方法是消息傳遞,其中線程或執行者通過發送消息進行通信。Rust 的標準庫提供了通道(channel)實現,這是一種實現消息傳遞并發的方式。
創建通道
use std::sync::mpsc;
use std::thread;fn main() {// 創建一個通道let (tx, rx) = mpsc::channel();// 在新線程中發送消息thread::spawn(move || {let val = String::from("你好");tx.send(val).unwrap();// 錯誤:val 的所有權已轉移// println!("val 是 {}", val);});// 在主線程中接收消息let received = rx.recv().unwrap();println!("收到: {}", received);
}
mpsc
代表"多生產者,單消費者"(multiple producers, single consumer)。通道有兩部分:發送端(tx
)和接收端(rx
)。
發送多個值
use std::sync::mpsc;
use std::thread;
use std::time::Duration;fn main() {let (tx, rx) = mpsc::channel();thread::spawn(move || {let vals = vec![String::from("你好"),String::from("來自"),String::from("線程"),];for val in vals {tx.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});for received in rx {println!("收到: {}", received);}
}
多個生產者
通過克隆發送者,我們可以有多個生產者:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;fn main() {let (tx, rx) = mpsc::channel();let tx1 = tx.clone();thread::spawn(move || {let vals = vec![String::from("你好"),String::from("來自"),String::from("線程1"),];for val in vals {tx1.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});thread::spawn(move || {let vals = vec![String::from("更多"),String::from("來自"),String::from("線程2"),];for val in vals {tx.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});for received in rx {println!("收到: {}", received);}
}
共享狀態
另一種處理并發的方法是允許多個線程訪問同一塊數據。這種方法需要特別小心,以避免數據競爭。
互斥鎖(Mutex)
互斥鎖(Mutex,mutual exclusion)確保在任何時刻只有一個線程可以訪問數據:
use std::sync::Mutex;fn main() {let m = Mutex::new(5);{let mut num = m.lock().unwrap();*num = 6;} // 鎖在這里被釋放println!("m = {:?}", m);
}
在線程間共享 Mutex
use std::sync::{Arc, Mutex};
use std::thread;fn main() {let counter = Arc::new(Mutex::new(0));let mut handles = vec![];for _ in 0..10 {let counter = Arc::clone(&counter);let handle = thread::spawn(move || {let mut num = counter.lock().unwrap();*num += 1;});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("結果: {}", *counter.lock().unwrap());
}
Arc<T>
是一個原子引用計數類型,允許在線程間安全地共享所有權。它類似于 Rc<T>
,但可以在并發環境中使用。
死鎖
使用互斥鎖時需要注意死鎖問題,當兩個線程各自持有一個鎖并嘗試獲取對方的鎖時,就會發生死鎖:
use std::sync::{Mutex, MutexGuard};
use std::thread;
use std::time::Duration;fn main() {let mutex_a = Mutex::new(5);let mutex_b = Mutex::new(5);let thread_a = thread::spawn(move || {// 線程 A 先鎖定 mutex_alet mut a: MutexGuard<i32> = mutex_a.lock().unwrap();println!("線程 A 獲取了 mutex_a");// 睡眠一段時間,讓線程 B 有機會鎖定 mutex_bthread::sleep(Duration::from_millis(100));// 線程 A 嘗試鎖定 mutex_bprintln!("線程 A 嘗試獲取 mutex_b");let mut b: MutexGuard<i32> = mutex_b.lock().unwrap();*a += *b;});let thread_b = thread::spawn(move || {// 線程 B 先鎖定 mutex_blet mut b: MutexGuard<i32> = mutex_b.lock().unwrap();println!("線程 B 獲取了 mutex_b");// 睡眠一段時間,讓線程 A 有機會鎖定 mutex_athread::sleep(Duration::from_millis(100));// 線程 B 嘗試鎖定 mutex_aprintln!("線程 B 嘗試獲取 mutex_a");let mut a: MutexGuard<i32> = mutex_a.lock().unwrap();*b += *a;});// 注意:這個例子會導致死鎖!
}
讀寫鎖(RwLock)
讀寫鎖允許多個讀取器或一個寫入器訪問數據:
use std::sync::RwLock;fn main() {let lock = RwLock::new(5);// 多個讀取器可以同時訪問數據{let r1 = lock.read().unwrap();let r2 = lock.read().unwrap();println!("讀取器: {} {}", r1, r2);} // 讀鎖在這里被釋放// 只能有一個寫入器{let mut w = lock.write().unwrap();*w += 1;println!("寫入后: {}", *w);} // 寫鎖在這里被釋放
}
原子類型
對于簡單的計數器和標志,可以使用原子類型,它們提供了無鎖的線程安全操作:
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;fn main() {let counter = Arc::new(AtomicUsize::new(0));let mut handles = vec![];for _ in 0..10 {let counter = Arc::clone(&counter);let handle = thread::spawn(move || {for _ in 0..1000 {counter.fetch_add(1, Ordering::SeqCst);}});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("結果: {}", counter.load(Ordering::SeqCst));
}
Ordering
參數指定內存順序約束:
Relaxed
:最寬松的順序,只保證原子性Release
:寫操作使用,確保之前的操作不會被重排到此操作之后Acquire
:讀操作使用,確保之后的操作不會被重排到此操作之前AcqRel
:結合了Acquire
和Release
SeqCst
:最嚴格的順序,提供全序關系
條件變量
條件變量允許線程等待某個條件變為真:
use std::sync::{Arc, Mutex, Condvar};
use std::thread;fn main() {let pair = Arc::new((Mutex::new(false), Condvar::new()));let pair2 = Arc::clone(&pair);thread::spawn(move || {let (lock, cvar) = &*pair2;let mut started = lock.lock().unwrap();println!("改變條件變量之前");*started = true;// 通知等待的線程cvar.notify_one();println!("條件變量已改變");});let (lock, cvar) = &*pair;let mut started = lock.lock().unwrap();// 等待條件變為真while !*started {started = cvar.wait(started).unwrap();}println!("條件已滿足,主線程繼續執行");
}
屏障(Barrier)
屏障確保多個線程在某一點同步:
use std::sync::{Arc, Barrier};
use std::thread;fn main() {let mut handles = Vec::with_capacity(10);let barrier = Arc::new(Barrier::new(10));for i in 0..10 {let b = Arc::clone(&barrier);handles.push(thread::spawn(move || {println!("線程 {} 開始工作", i);// 模擬工作thread::sleep(std::time::Duration::from_millis(i * 100));println!("線程 {} 到達屏障", i);// 等待所有線程到達屏障b.wait();println!("線程 {} 繼續執行", i);}));}for handle in handles {handle.join().unwrap();}
}
線程池
創建線程有開銷,線程池可以重用線程,提高性能:
use std::sync::{mpsc, Arc, Mutex};
use std::thread;struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Job>,
}type Job = Box<dyn FnOnce() + Send + 'static>;impl ThreadPool {fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender }}fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{let job = Box::new(f);self.sender.send(job).unwrap();}
}struct Worker {id: usize,thread: thread::JoinHandle<()>,
}impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || loop {let job = receiver.lock().unwrap().recv().unwrap();println!("工作線程 {} 獲得了一個任務", id);job();});Worker { id, thread }}
}fn main() {let pool = ThreadPool::new(4);for i in 0..8 {pool.execute(move || {println!("執行任務 {}", i);thread::sleep(std::time::Duration::from_secs(1));println!("任務 {} 完成", i);});}// 給線程池一些時間來處理任務thread::sleep(std::time::Duration::from_secs(10));
}
并發最佳實踐
1. 優先使用消息傳遞
當可能時,優先使用消息傳遞而不是共享狀態:
use std::sync::mpsc;
use std::thread;fn main() {let (tx, rx) = mpsc::channel();// 啟動工作線程for i in 0..4 {let tx = tx.clone();thread::spawn(move || {// 模擬工作let result = i * i;tx.send(result).unwrap();});}// 丟棄原始發送者drop(tx);// 收集結果let mut results = Vec::new();for received in rx {results.push(received);}println!("結果: {:?}", results);
}
2. 使用適當的同步原語
根據需求選擇合適的同步原語:
- 對于簡單計數器:使用
AtomicUsize
- 對于需要獨占訪問的數據:使用
Mutex
- 對于讀多寫少的數據:使用
RwLock
- 對于一次性初始化:使用
lazy_static
和OnceCell
/OnceLock
3. 避免過度同步
過度同步會導致性能下降:
use std::sync::{Arc, Mutex};
use std::thread;// 不好的做法:鎖的粒度太大
fn process_data_bad(data: &[i32]) -> i32 {let result = Arc::new(Mutex::new(0));let mut handles = vec![];for chunk in data.chunks(100) {let result = Arc::clone(&result);let chunk = chunk.to_vec();handles.push(thread::spawn(move || {// 計算和let sum: i32 = chunk.iter().sum();// 獲取鎖并更新結果let mut result = result.lock().unwrap();*result += sum;}));}for handle in handles {handle.join().unwrap();}*result.lock().unwrap()
}// 好的做法:減少鎖的競爭
fn process_data_good(data: &[i32]) -> i32 {let mut handles = vec![];for chunk in data.chunks(100) {let chunk = chunk.to_vec();handles.push(thread::spawn(move || {// 計算和并返回chunk.iter().sum::<i32>()}));}// 收集結果let mut final_result = 0;for handle in handles {final_result += handle.join().unwrap();}final_result
}
4. 使用線程局部存儲
對于每個線程需要獨立狀態的情況,使用線程局部存儲:
use std::cell::RefCell;
use std::thread;
use std::thread_local;thread_local! {static COUNTER: RefCell<u32> = RefCell::new(0);
}fn main() {let mut handles = vec![];for _ in 0..5 {handles.push(thread::spawn(|| {for _ in 0..10 {COUNTER.with(|c| {*c.borrow_mut() += 1;});}let result = COUNTER.with(|c| *c.borrow());println!("線程計數器: {}", result);}));}for handle in handles {handle.join().unwrap();}
}
5. 使用 parking_lot 庫
parking_lot
庫提供了更高性能的同步原語:
// 在 Cargo.toml 中添加:
// [dependencies]
// parking_lot = "0.12.0"use parking_lot::{Mutex, RwLock};
use std::sync::Arc;
use std::thread;fn main() {let counter = Arc::new(Mutex::new(0));let mut handles = vec![];for _ in 0..10 {let counter = Arc::clone(&counter);let handle = thread::spawn(move || {let mut num = counter.lock();*num += 1;});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("結果: {}", *counter.lock());
}
練習題
-
實現一個并發計數器,使用不同的同步原語(
Mutex
、RwLock
、AtomicUsize
)并比較它們的性能。 -
創建一個簡單的生產者-消費者系統,其中多個生產者線程生成隨機數,多個消費者線程計算這些數字的平方并打印結果。
-
實現一個并發哈希表,允許多個線程同時讀取,但只允許一個線程寫入。使用適當的同步原語確保線程安全。
-
編寫一個程序,使用屏障(
Barrier
)同步多個線程,讓它們同時開始執行一個計算密集型任務,并測量完成時間。 -
實現一個簡單的線程池,可以提交任務并等待所有任務完成。包括一個優雅的關閉機制,確保所有任務都能完成。
總結
在本章中,我們探討了 Rust 的并發編程模型:
- 線程基礎和所有權規則
- 消息傳遞并發(通道)
- 共享狀態并發(互斥鎖、讀寫鎖、原子類型)
- 條件變量和屏障
- 線程池實現
- 并發編程最佳實踐
Rust 的類型系統和所有權規則使得并發編程更加安全,在編譯時就能捕獲許多常見的并發錯誤。通過選擇適當的并發模型和同步原語,你可以編寫高效、安全的并發代碼。在下一章中,我們將探索 Rust 的異步編程模型,它提供了一種更輕量級的并發方式。