Rust從入門到精通之進階篇:14.并發編程

并發編程

并發編程允許程序同時執行多個獨立的任務,充分利用現代多核處理器的性能。Rust 提供了強大的并發原語,同時通過類型系統和所有權規則在編譯時防止數據競爭和其他常見的并發錯誤。在本章中,我們將探索 Rust 的并發編程模型。

線程基礎

創建線程

Rust 標準庫提供了 std::thread 模塊,用于創建和管理線程:

use std::thread;
use std::time::Duration;fn main() {// 創建一個新線程let handle = thread::spawn(|| {for i in 1..10 {println!("在新線程中: {}", i);thread::sleep(Duration::from_millis(1));}});// 主線程中的代碼for i in 1..5 {println!("在主線程中: {}", i);thread::sleep(Duration::from_millis(1));}// 等待新線程完成handle.join().unwrap();
}

thread::spawn 函數接受一個閉包,該閉包包含要在新線程中執行的代碼,并返回一個 JoinHandle。調用 join 方法會阻塞當前線程,直到新線程完成。

線程與所有權

當我們將閉包傳遞給 thread::spawn 時,Rust 需要知道閉包將在哪個線程中運行以及它將使用哪些數據。閉包默認會捕獲其環境中的變量,但在線程間傳遞數據時,我們需要考慮所有權問題。

use std::thread;fn main() {let v = vec![1, 2, 3];// 錯誤:Rust 無法確定 v 的生命周期// let handle = thread::spawn(|| {//     println!("這是向量: {:?}", v);// });// 使用 move 關鍵字轉移所有權let handle = thread::spawn(move || {println!("這是向量: {:?}", v);});// 錯誤:v 的所有權已經轉移到新線程// println!("向量: {:?}", v);handle.join().unwrap();
}

使用 move 關鍵字可以強制閉包獲取其使用的值的所有權,而不是借用它們。這對于確保數據在線程運行期間有效非常重要。

消息傳遞

一種處理并發的流行方法是消息傳遞,其中線程或執行者通過發送消息進行通信。Rust 的標準庫提供了通道(channel)實現,這是一種實現消息傳遞并發的方式。

創建通道

use std::sync::mpsc;
use std::thread;fn main() {// 創建一個通道let (tx, rx) = mpsc::channel();// 在新線程中發送消息thread::spawn(move || {let val = String::from("你好");tx.send(val).unwrap();// 錯誤:val 的所有權已轉移// println!("val 是 {}", val);});// 在主線程中接收消息let received = rx.recv().unwrap();println!("收到: {}", received);
}

mpsc 代表"多生產者,單消費者"(multiple producers, single consumer)。通道有兩部分:發送端(tx)和接收端(rx)。

發送多個值

use std::sync::mpsc;
use std::thread;
use std::time::Duration;fn main() {let (tx, rx) = mpsc::channel();thread::spawn(move || {let vals = vec![String::from("你好"),String::from("來自"),String::from("線程"),];for val in vals {tx.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});for received in rx {println!("收到: {}", received);}
}

多個生產者

通過克隆發送者,我們可以有多個生產者:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;fn main() {let (tx, rx) = mpsc::channel();let tx1 = tx.clone();thread::spawn(move || {let vals = vec![String::from("你好"),String::from("來自"),String::from("線程1"),];for val in vals {tx1.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});thread::spawn(move || {let vals = vec![String::from("更多"),String::from("來自"),String::from("線程2"),];for val in vals {tx.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});for received in rx {println!("收到: {}", received);}
}

共享狀態

另一種處理并發的方法是允許多個線程訪問同一塊數據。這種方法需要特別小心,以避免數據競爭。

互斥鎖(Mutex)

互斥鎖(Mutex,mutual exclusion)確保在任何時刻只有一個線程可以訪問數據:

use std::sync::Mutex;fn main() {let m = Mutex::new(5);{let mut num = m.lock().unwrap();*num = 6;} // 鎖在這里被釋放println!("m = {:?}", m);
}

在線程間共享 Mutex

use std::sync::{Arc, Mutex};
use std::thread;fn main() {let counter = Arc::new(Mutex::new(0));let mut handles = vec![];for _ in 0..10 {let counter = Arc::clone(&counter);let handle = thread::spawn(move || {let mut num = counter.lock().unwrap();*num += 1;});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("結果: {}", *counter.lock().unwrap());
}

Arc<T> 是一個原子引用計數類型,允許在線程間安全地共享所有權。它類似于 Rc<T>,但可以在并發環境中使用。

死鎖

使用互斥鎖時需要注意死鎖問題,當兩個線程各自持有一個鎖并嘗試獲取對方的鎖時,就會發生死鎖:

use std::sync::{Mutex, MutexGuard};
use std::thread;
use std::time::Duration;fn main() {let mutex_a = Mutex::new(5);let mutex_b = Mutex::new(5);let thread_a = thread::spawn(move || {// 線程 A 先鎖定 mutex_alet mut a: MutexGuard<i32> = mutex_a.lock().unwrap();println!("線程 A 獲取了 mutex_a");// 睡眠一段時間,讓線程 B 有機會鎖定 mutex_bthread::sleep(Duration::from_millis(100));// 線程 A 嘗試鎖定 mutex_bprintln!("線程 A 嘗試獲取 mutex_b");let mut b: MutexGuard<i32> = mutex_b.lock().unwrap();*a += *b;});let thread_b = thread::spawn(move || {// 線程 B 先鎖定 mutex_blet mut b: MutexGuard<i32> = mutex_b.lock().unwrap();println!("線程 B 獲取了 mutex_b");// 睡眠一段時間,讓線程 A 有機會鎖定 mutex_athread::sleep(Duration::from_millis(100));// 線程 B 嘗試鎖定 mutex_aprintln!("線程 B 嘗試獲取 mutex_a");let mut a: MutexGuard<i32> = mutex_a.lock().unwrap();*b += *a;});// 注意:這個例子會導致死鎖!
}

讀寫鎖(RwLock)

讀寫鎖允許多個讀取器或一個寫入器訪問數據:

use std::sync::RwLock;fn main() {let lock = RwLock::new(5);// 多個讀取器可以同時訪問數據{let r1 = lock.read().unwrap();let r2 = lock.read().unwrap();println!("讀取器: {} {}", r1, r2);} // 讀鎖在這里被釋放// 只能有一個寫入器{let mut w = lock.write().unwrap();*w += 1;println!("寫入后: {}", *w);} // 寫鎖在這里被釋放
}

原子類型

對于簡單的計數器和標志,可以使用原子類型,它們提供了無鎖的線程安全操作:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;fn main() {let counter = Arc::new(AtomicUsize::new(0));let mut handles = vec![];for _ in 0..10 {let counter = Arc::clone(&counter);let handle = thread::spawn(move || {for _ in 0..1000 {counter.fetch_add(1, Ordering::SeqCst);}});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("結果: {}", counter.load(Ordering::SeqCst));
}

Ordering 參數指定內存順序約束:

  • Relaxed:最寬松的順序,只保證原子性
  • Release:寫操作使用,確保之前的操作不會被重排到此操作之后
  • Acquire:讀操作使用,確保之后的操作不會被重排到此操作之前
  • AcqRel:結合了 AcquireRelease
  • SeqCst:最嚴格的順序,提供全序關系

條件變量

條件變量允許線程等待某個條件變為真:

use std::sync::{Arc, Mutex, Condvar};
use std::thread;fn main() {let pair = Arc::new((Mutex::new(false), Condvar::new()));let pair2 = Arc::clone(&pair);thread::spawn(move || {let (lock, cvar) = &*pair2;let mut started = lock.lock().unwrap();println!("改變條件變量之前");*started = true;// 通知等待的線程cvar.notify_one();println!("條件變量已改變");});let (lock, cvar) = &*pair;let mut started = lock.lock().unwrap();// 等待條件變為真while !*started {started = cvar.wait(started).unwrap();}println!("條件已滿足,主線程繼續執行");
}

屏障(Barrier)

屏障確保多個線程在某一點同步:

use std::sync::{Arc, Barrier};
use std::thread;fn main() {let mut handles = Vec::with_capacity(10);let barrier = Arc::new(Barrier::new(10));for i in 0..10 {let b = Arc::clone(&barrier);handles.push(thread::spawn(move || {println!("線程 {} 開始工作", i);// 模擬工作thread::sleep(std::time::Duration::from_millis(i * 100));println!("線程 {} 到達屏障", i);// 等待所有線程到達屏障b.wait();println!("線程 {} 繼續執行", i);}));}for handle in handles {handle.join().unwrap();}
}

線程池

創建線程有開銷,線程池可以重用線程,提高性能:

use std::sync::{mpsc, Arc, Mutex};
use std::thread;struct ThreadPool {workers: Vec<Worker>,sender: mpsc::Sender<Job>,
}type Job = Box<dyn FnOnce() + Send + 'static>;impl ThreadPool {fn new(size: usize) -> ThreadPool {assert!(size > 0);let (sender, receiver) = mpsc::channel();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(size);for id in 0..size {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { workers, sender }}fn execute<F>(&self, f: F)whereF: FnOnce() + Send + 'static,{let job = Box::new(f);self.sender.send(job).unwrap();}
}struct Worker {id: usize,thread: thread::JoinHandle<()>,
}impl Worker {fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {let thread = thread::spawn(move || loop {let job = receiver.lock().unwrap().recv().unwrap();println!("工作線程 {} 獲得了一個任務", id);job();});Worker { id, thread }}
}fn main() {let pool = ThreadPool::new(4);for i in 0..8 {pool.execute(move || {println!("執行任務 {}", i);thread::sleep(std::time::Duration::from_secs(1));println!("任務 {} 完成", i);});}// 給線程池一些時間來處理任務thread::sleep(std::time::Duration::from_secs(10));
}

并發最佳實踐

1. 優先使用消息傳遞

當可能時,優先使用消息傳遞而不是共享狀態:

use std::sync::mpsc;
use std::thread;fn main() {let (tx, rx) = mpsc::channel();// 啟動工作線程for i in 0..4 {let tx = tx.clone();thread::spawn(move || {// 模擬工作let result = i * i;tx.send(result).unwrap();});}// 丟棄原始發送者drop(tx);// 收集結果let mut results = Vec::new();for received in rx {results.push(received);}println!("結果: {:?}", results);
}

2. 使用適當的同步原語

根據需求選擇合適的同步原語:

  • 對于簡單計數器:使用 AtomicUsize
  • 對于需要獨占訪問的數據:使用 Mutex
  • 對于讀多寫少的數據:使用 RwLock
  • 對于一次性初始化:使用 lazy_staticOnceCell/OnceLock

3. 避免過度同步

過度同步會導致性能下降:

use std::sync::{Arc, Mutex};
use std::thread;// 不好的做法:鎖的粒度太大
fn process_data_bad(data: &[i32]) -> i32 {let result = Arc::new(Mutex::new(0));let mut handles = vec![];for chunk in data.chunks(100) {let result = Arc::clone(&result);let chunk = chunk.to_vec();handles.push(thread::spawn(move || {// 計算和let sum: i32 = chunk.iter().sum();// 獲取鎖并更新結果let mut result = result.lock().unwrap();*result += sum;}));}for handle in handles {handle.join().unwrap();}*result.lock().unwrap()
}// 好的做法:減少鎖的競爭
fn process_data_good(data: &[i32]) -> i32 {let mut handles = vec![];for chunk in data.chunks(100) {let chunk = chunk.to_vec();handles.push(thread::spawn(move || {// 計算和并返回chunk.iter().sum::<i32>()}));}// 收集結果let mut final_result = 0;for handle in handles {final_result += handle.join().unwrap();}final_result
}

4. 使用線程局部存儲

對于每個線程需要獨立狀態的情況,使用線程局部存儲:

use std::cell::RefCell;
use std::thread;
use std::thread_local;thread_local! {static COUNTER: RefCell<u32> = RefCell::new(0);
}fn main() {let mut handles = vec![];for _ in 0..5 {handles.push(thread::spawn(|| {for _ in 0..10 {COUNTER.with(|c| {*c.borrow_mut() += 1;});}let result = COUNTER.with(|c| *c.borrow());println!("線程計數器: {}", result);}));}for handle in handles {handle.join().unwrap();}
}

5. 使用 parking_lot 庫

parking_lot 庫提供了更高性能的同步原語:

// 在 Cargo.toml 中添加:
// [dependencies]
// parking_lot = "0.12.0"use parking_lot::{Mutex, RwLock};
use std::sync::Arc;
use std::thread;fn main() {let counter = Arc::new(Mutex::new(0));let mut handles = vec![];for _ in 0..10 {let counter = Arc::clone(&counter);let handle = thread::spawn(move || {let mut num = counter.lock();*num += 1;});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("結果: {}", *counter.lock());
}

練習題

  1. 實現一個并發計數器,使用不同的同步原語(MutexRwLockAtomicUsize)并比較它們的性能。

  2. 創建一個簡單的生產者-消費者系統,其中多個生產者線程生成隨機數,多個消費者線程計算這些數字的平方并打印結果。

  3. 實現一個并發哈希表,允許多個線程同時讀取,但只允許一個線程寫入。使用適當的同步原語確保線程安全。

  4. 編寫一個程序,使用屏障(Barrier)同步多個線程,讓它們同時開始執行一個計算密集型任務,并測量完成時間。

  5. 實現一個簡單的線程池,可以提交任務并等待所有任務完成。包括一個優雅的關閉機制,確保所有任務都能完成。

總結

在本章中,我們探討了 Rust 的并發編程模型:

  • 線程基礎和所有權規則
  • 消息傳遞并發(通道)
  • 共享狀態并發(互斥鎖、讀寫鎖、原子類型)
  • 條件變量和屏障
  • 線程池實現
  • 并發編程最佳實踐

Rust 的類型系統和所有權規則使得并發編程更加安全,在編譯時就能捕獲許多常見的并發錯誤。通過選擇適當的并發模型和同步原語,你可以編寫高效、安全的并發代碼。在下一章中,我們將探索 Rust 的異步編程模型,它提供了一種更輕量級的并發方式。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/75567.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/75567.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/75567.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

算法訓練營第二十三天 | 貪心算法(一)

文章目錄 一、貪心算法理論基礎二、Leetcode 455.分發餅干二、Leetcode 376. 擺動序列三、Leetcode 53. 最大子序和 一、貪心算法理論基礎 貪心算法是一種在每一步選擇中都采取當前狀態下的最優決策&#xff0c;從而希望最終達到全局最優解的算法設計技術。 基本思想 貪心算…

css基礎-display 常用布局

CSS display 屬性詳解 屬性設置元素是否被視為塊級或行級盒子以及用于子元素的布局&#xff0c;例如流式布局、網格布局或彈性布局。 一、基礎顯示模式 1. block 作用&#xff1a; 元素獨占一行可設置寬高和內外邊距默認寬度撐滿父容器 應用場景&#xff1a; 布局容器&a…

速賣通API數據清洗實戰:從原始JSON到結構化商品數據庫

下面將詳細介紹如何把速賣通 API 返回的原始 JSON 數據清洗并轉換為結構化商品數據庫。 1. 數據獲取 首先要借助速賣通 API 獲取商品數據&#xff0c;以 Python 為例&#xff0c;可使用requests庫發送請求并得到 JSON 數據。 import requests# 替換為你的 API Key 和 Secret …

【零基礎入門unity游戲開發——2D篇】2D物理系統 —— 2D剛體組件(Rigidbody2D)

考慮到每個人基礎可能不一樣,且并不是所有人都有同時做2D、3D開發的需求,所以我把 【零基礎入門unity游戲開發】 分為成了C#篇、unity通用篇、unity3D篇、unity2D篇。 【C#篇】:主要講解C#的基礎語法,包括變量、數據類型、運算符、流程控制、面向對象等,適合沒有編程基礎的…

Collectors.toMap / list 轉 map

前言 略 Collectors.toMap List<User> userList ...; Map<Long, User> userMap userList.stream().collect(Collectors.toMap(User::getUserId, Function.identity()));假如id存在重復值&#xff0c;則會報錯Duplicate key xxx, 解決方案 兩個重復id中&#…

熱門面試題第13天|Leetcode 110.平衡二叉樹 257. 二叉樹的所有路徑 404.左葉子之和 222.完全二叉樹的節點個數

222.完全二叉樹的節點個數&#xff08;優先掌握遞歸&#xff09; 需要了解&#xff0c;普通二叉樹 怎么求&#xff0c;完全二叉樹又怎么求 題目鏈接/文章講解/視頻講解&#xff1a;https://programmercarl.com/0222.%E5%AE%8C%E5%85%A8%E4%BA%8C%E5%8F%89%E6%A0%91%E7%9A%84%E8…

關于Object.assign

Object.assign 基本用法 Object.assign() 方法用于將所有可枚舉屬性的值從一個或者多個源對象source復制到目標對象。它將返回目標對象target const target { a: 1, b: 2 } const source { b: 4, c: 5 }const returnedTarget Object.assign(target, source)target // { a…

GitHub高級篩選小白使用手冊

GitHub高級篩選小白使用手冊 GitHub 提供了強大的搜索功能&#xff0c;允許用戶通過高級篩選器來精確查找倉庫、Issues、Pull Requests、代碼等。下面是一些常用的高級篩選用法&#xff0c;幫助你更高效地使用 GitHub 搜索功能。 目錄 搜索倉庫搜索Issues搜索Pull Requests搜…

手動集成sqlite的方法

注意到sqlite有backup方法&#xff08;https://www.sqlite.org/backup.html&#xff09;。 也注意到android中sysroot下&#xff0c;沒有sqlite3的庫&#xff0c;也沒有相關頭文件。 如果要使用 sqlite 的backup&#xff0c;那么就需要手動集成sqlite代碼到項目中。可以如下操…

藍橋杯真題 2109.統計子矩陣

原題地址:1.統計子矩陣 - 藍橋云課 問題描述 給定一個 NMNM 的矩陣 AA, 請你統計有多少個子矩陣 (最小 1111, 最大 NM)NM) 滿足子矩陣中所有數的和不超過給定的整數 KK ? 輸入格式 第一行包含三個整數 N,MN,M 和 KK. 之后 NN 行每行包含 MM 個整數, 代表矩陣 AA. 輸出格…

藍橋杯—最少操作數

一.題目 分析:每次可以進行三次操作&#xff0c;求在n步操作后可以達到目標數的最小n&#xff0c;和最短路徑問題相似&#xff0c;分層遍歷加記憶化搜索防止時間復雜度過高&#xff0c;還需要減枝操作 import java.util.HashSet; import java.util.LinkedList; import java.ut…

Linux內核NIC網卡驅動實戰案例分析

以下Linux 內核模塊實現了一個虛擬網絡設備驅動程序&#xff0c;其作用和意義如下&#xff1a; 1. 作用 &#xff08;1&#xff09;創建虛擬網絡設備對 驅動程序動態創建了兩個虛擬網絡設備&#xff08;nic_dev[0]和nic_dev[1]&#xff09;&#xff0c;模擬物理網卡的功能。這兩…

Trae初使用心得(Java后端)

1.前提 2025年3月3日&#xff0c;字節跳動正式官宣“中國首個 AI 原生集成開發環境&#xff08;AI IDE&#xff09;”Trae 國內版正式上線&#xff0c;由于之前項目的原因小編沒有及時的去體驗&#xff0c;這幾日專門抽空去體驗了一下感覺還算可以。 2.特點 Trade重在可以白嫖…

[項目]基于FreeRTOS的STM32四軸飛行器: 十二.角速度加速度濾波

基于FreeRTOS的STM32四軸飛行器: 十二.濾波 一.濾波介紹二.對角速度進行一階低通濾波三.對加速度進行卡爾曼濾波 一.濾波介紹 模擬信號濾波&#xff1a; 最常用的濾波方法可以在信號和地之間并聯一個電容&#xff0c;因為電容通交隔直&#xff0c;信號突變會給電容充電&#x…

UNIX網絡編程筆記:TCP、UDP、SCTP編程的區別

一、核心特性對比 特性TCPUDPSCTP連接方式面向連接&#xff08;三次握手&#xff09;無連接面向連接&#xff08;四次握手&#xff09;可靠性可靠傳輸&#xff08;重傳、確認機制&#xff09;不可靠傳輸可靠傳輸&#xff08;多路徑冗余&#xff09;傳輸單位字節流&#xff08;…

Python爬蟲異常處理:自動跳過無效URL

爬蟲在運行過程中常常會遇到各種異常情況&#xff0c;其中無效URL的出現是較為常見的問題之一。無效URL可能導致爬蟲程序崩潰或陷入無限等待狀態&#xff0c;嚴重影響爬蟲的穩定性和效率。因此&#xff0c;掌握如何在Python爬蟲中自動跳過無效URL的異常處理技巧&#xff0c;對于…

C++語法學習的主要內容

科技特長生方向&#xff0c;主要學習的內容為 一&#xff0c;《C語法》 二&#xff0c;《數據結構》 三&#xff0c;《算法》 四&#xff0c;《計算機基礎知識》 五&#xff0c;《初高中的數學知識》 其中&#xff0c;《C語法》學習的主要內容如下: 1,cout輸出語句和鍵盤…

3、孿生網絡/連體網絡(Siamese Network)

目的: 用Siamese Network (孿生網絡) 解決Few-shot learning (小樣本學習)。 Siamese Network并不是Meta Learning最好的方法, 但是通過學習Siamese Network,非常有助于理解其他Meta Learning算法。 這里介紹了兩種方法:Siamese Network (孿生網絡)、Trplet Loss Siam…

從零構建大語言模型全棧開發指南:第二部分:模型架構設計與實現-2.2.1從零編寫類GPT-2模型架構(規劃模塊與代碼組織)

?? 點擊關注不迷路 ?? 點擊關注不迷路 ?? 點擊關注不迷路 文章大綱 2.2.1 從零編寫類GPT-2模型架構(規劃模塊與代碼組織)1. 模型架構設計規劃1.1 架構核心組件2. 模塊化設計實現2.1 輸入處理模塊2.1.1 分詞與嵌入2.1.2 位置編碼2.2 解碼塊設計2.2.1 多頭注意力子層2.2.…

消息隊列(Kafka及RocketMQ等對比聯系)

目錄 消息隊列 一、為什么使用消息隊列&#xff1f;消息隊列有什么優點/缺點&#xff1f;介紹下Kafka、ActiveMQ、RabbitMQ、RocketMQ有什么優點缺點&#xff0c;如何取舍&#xff1f; 1.公司業務場景是什么&#xff0c;這個業務場景有什么挑戰&#xff0c;如果不用MQ有什么麻…