原子操作
- 原子操作原理
- 什么是原子操作?
- 原子性
- 原子變量相關接口
- 內存序
- shared_ptr的實現
原子操作原理
什么是原子操作?
原子操作其實就是指在多線程的環境下,確保對共享變量的操作不會被干擾,從而避免了競態條件。
我們都知道,在多線程環境下,對于共享變量的操作是存在線程安全的問題的,假設當前存在一個全局變量 count ,此時創建多個線程對 count 進行 ++ 操作,就會誘發線程安全的問題,對于 count 這種臨界資源來說,表面上我們看著只有一句代碼,但是解釋成匯編語句他就會存在多個操作:
多線程環境下就存在競態條件,多個線程去競爭 count 這一個臨界資源,對于單條匯編語句,他的操作必定是原子性的,但是在多條匯編語句下,隨著線程的切換, count 變量的操作就充滿了不確定性。
而原子變量就可以有效的解決掉這個問題,他保證了對于 count 這個變量的操作,比如說現在有兩個線程,線程 A 對 count 變量操作的時候,他的操作是不可被打斷的,只有當線程 A 對 count 變量操作完成以后,線程 B 才會去獲取到這個資源,進行他的操作,這就保證了 count 操作的原子性。
原子性
理解原子性,我們首先的從 CPU 的存儲體系架構下手,在當前這個多處理器多核心工作的架構下,必定會存在 CPU 資源競爭的發生,因為我們的線程就是在核心上運行的,競態條件的發生就會意味著切換,就會有線程安全的情況發生。
首先我們來了解一下CPU 的存儲體系架構,如下圖所示:
因為在當前時代下,CPU 的讀取速度時遠高于主存的,就出現了緩存(cache)的概念,我們可以理解為他是 CPU 與主存之間的一種存儲結構,他的讀取速度高于主存略低于 CPU ,主要分為 3 級,分為 L1,L2,L3,前兩級 L1,L2 是 CPU 核心所獨有,L3 是所有 CPU 核心共享的。
原子性就是指在當前核心上的操作的中間狀態并不被其他核心所識別到,就如下圖所示:
而我們有如何做到這種狀態呢?
如果是單處理器單核心的情況下,就比較簡單,因為就算是創建多個線程,每次也只會允許一個線程進行運轉,主要我們屏蔽掉中斷機制,使用底層自旋鎖,我們就可以去做到這種原子性的操作,但是注意,這只是在單處理器單核心下,當前開發環境大多數都是在多處理器多核心的情況下。
在多處理器多核心的場景下,就會存在 CPU 資源被爭奪的情況產生,要保證原子性,就需要保證其他的核心不去操作當前核心操作的內存空間,對于L1,L2緩存來說,當前核心獨有,我們并不需要做過多的操作,但是對于 L3 緩存來說,是所有核心所共有的,在我們當前的工作模式下,有一種總線嗅探機制,就是最終的數據會通過總線傳遞給其他的核心。
以往 0x86 是通過鎖總線的方式,避免所有內存的訪問,但是這就會造成一個問題,就是效率低下的問題,只有當前核心可以工作,其他核心都被阻攔,明顯效率就變低了。現在采用的是阻止其他核心對相關的內存空間訪問。
CPU 如何讀寫數據
首先我們來看寫策略,對于寫策略,通常會存在以下兩種處理方式:
- 直寫:這種策略其實很簡單,當 CPU 需要寫入數據時,此時就會將數據同時都寫入到 cache 和主存當中,也就意味著其他核心每一次讀取到的數據都是最新的,就不會出現不同步的問題。
- 寫回:當 CPU 需要寫入數據時,此時并不是將數據立即就寫入到主存當中,而是先寫入緩存當中,緩存當中是以 cache line 去保存數據的,其中有一個標記位,我們寫入數據以后,如果發生緩存命中,此時就可以直接寫,如果沒有命中,就會去緩存中定位一個數據塊,將數據進行寫入,同時標記標志位為 dirty 臟數據。
- 當我們在緩存中定位數據塊是會出現兩種情況,一種是緩存中所有的數據塊都已經被標記,沒有空閑的,此時就會就會通過 LRU 策略(最久沒有被使用)去淘汰掉一個塊(刷回主存中),然后將當前數據進行寫入,另一種就是存在空閑的塊,此時就會將數據寫入,然后標記為臟數據。
- 注意, LRU 策略下并不是將數據清除掉,而是回檢查當前數據塊標志位是否為 dirty ,如果是 dirty ,就會將其刷入主存當中,也就是說這種場景下,我們進行寫操作,本身我們寫入的是 i 的數據,但是當前我們 LRU 需要去替換掉的是 g 的 cache line,并且當前 cache line 的標記位本身為 dirty,就會將 g 的數據先刷新到主存當中,然后在寫入 i 的數據,這就是寫回策略。
接下來看讀策略:
- 讀策略相對來說更加容易理解,CPU 在進行數據讀取時,首先會去緩存中讀取,如果命中,就會直接讀取到,如果沒有命中,此時也是需要定位緩存塊的,如果定位到非臟的緩存塊,就直接從主存中將該數據刷入到緩存塊當中,標記為臟數據;
- 如果未命中,同樣的道理,如果當前 cache line 標記位為臟數據,會先將當前 cache line 的數據刷新會主存,然后從主存中讀取我們需要的數據到當前緩存中,標記為非臟數據,因為我們當前并沒有寫操作發生。
緩存不一致問題
了解了 CPU 讀寫數據的操作以后,我們不難發現,在多核多處理器下就會出現緩存不一致的問題,試想一下,有一個多核心的 CPU,其中核心 A 和核心 B 都需要訪問內存中的同一個數據 X 。一開始,數據 X 被加載到核心 A 和核心 B 各自的緩存中 。當核心 A 對緩存中的數據 X 進行修改時,此時核心 A 緩存中的數據 X 已經更新,但是有 CPU 寫回策略的存在,核心 B 當中數據依然是舊的,此時核心 B 進行讀操作,讀取的就是舊的數據,那么就會出現緩存不一致的問題。
基于寫回策略的影響,就出現了緩存一致性的問題,在設計的過程中為了避免這種問題,就出現了我們最開始所說的總線嗅探方案,每個 CPU 核心都與總線相連接,總線上一旦發生一些變化,就會被 CPU 嗅探到,當一個CPU核心對自己緩存中的數據進行寫操作時,它會向總線發送一個寫請求,這個請求包含了被修改數據的地址等信息 。
但是這種方式的問題就在于,寫操作是以廣播的方式發送出去的,每個核心都可以收到,有一些核心并不需要當前的數據,他也會收到,這樣就會造成帶寬的壓力,所以,在這兒也進行了優化,確保我們當前的修改只有需要的核心會去訪問到,不需要的完全可以不用去管,這樣就減少了帶寬的壓力。
當然,在這種總線嗅探機制下,就會存在數據接收快慢的問題,因為數據的傳輸是存在順序的,有的核心離得近,有的核心離得遠,我們必須保證當前的數據修改被每一個需要核心所接收到,并且在這個過程中數據并不會被更改,在這塊兒就會存在一個 lock 指令讓其基于嗅探機制實現事務的串行化,保證每一個核心所接收到的都是最新的數據。
緩存一致性協議 MESI
MESI 協議是一個基于失效的緩存一致性協議,支持 write-back 寫回緩存的常用協議。
主要原理:通過總線嗅探策略(將讀寫請求通過總線廣播給所有核心,核心根據本地狀態進行響應)。
MESI 協議存在四種狀態:
- Modified(M):某數據已修改但是沒有同步到內存中。如果其他核心要讀該數據,需要將該數據從緩存同步到內存中,并將狀態轉為 ;
- Exclusive(E):某數據只在該核心當中,此時緩存和內存中的數據一致;
- Shared(S):某數據在多個核心中,此時緩存和內存中的數據一致;
- Invaliddate(I):某數據在該核心中以失效,不是最新數據。
通過以上的知識介紹,我們就可以理解原子性以及原子變量的原理有一個初步的了解,接下來我們來看一下關于原子變量的一些接口調用:
原子變量相關接口
store(T desired, std::memory_order order)
:用于將指定的值存儲到原子對象中;load(std::memory_order order)
:用于獲取原子變量的當前值;exchange(std::atomic<T>* obj, T desired)
:訪問和修改包含的值,將包含的值替換并返回它前面的值。如果替換成功,則返回原來的值。compare_exchange_weak(T& expected, T val, memory_order success, memory_order failure)
:比較一個值和一個期望值是否相等,如果相等則將該值替換成一個新值,并返回 true;否則不做任何操作并返回 false。注意,compare_exchange_weak
函數是一個弱化版本的原子操作函數,因為在某些平臺上它可能會失敗并重試。如果需要保證嚴格的原子性,則應該使用 compare_exchange_strong 函數。fetch_add,fetch_sub,fetch_and,fetch_or,fetch_xor
:這都是一些基于運算的操作。
這些函數仔細觀察都會存在一個特點,我們會發現存在一個 mem_order ,這個代碼的就是內存序,內存序定義了原子操作之間的可見性關系和順序約束,直接影響程序的線程安全性。
內存序
我們平時的代碼都點我們的邏輯順序去寫的,但是對于 CPU 和編譯器來說,是會對代碼進行優化的,存在指令重排,比說下面這段代碼:
int main()
{int i = 0;int j = 1;i += 1;j += 2;i += 3;return 0;
}
對于 CPU 和編譯器來說,他是可以不按照我們的代碼邏輯來進行加載的,比如說上面對于 i 的操作就會加載在一起,然后先操作,然后再加載對 j 的操作,這也是 CPU 的局部性原理所決定的,這樣的優化可以去提高程序的運行效率。
在多線程的環境下,CPU 的切換就會破壞邏輯,我們會進行加鎖操作,而我們的加鎖操作就是在控制內存序,讓 CPU 和編譯器不再去進行優化,這也就能解釋為什么頻繁的鎖操作會影響程序的運行效率,因為這樣并不會讓 CPU 和編譯器對程序進行優化。
內存序規定了多個線程訪問同一個地址時的語義,他決定了某個線程對內存的操作何時能被其他線程所看見,某個線程對內存附近的訪問可以做到怎樣的優化。
C++ 標準定義了 6 中內存序:
memory_order_relaxed
memory_order_relaxed 所代表是松散內存序,只用來保證對原子對象的操作是原子的,在不需要保證順序時使用,這也就意味著他只保證當前的操作是原子的,對于代碼邏輯隨便 CPU 和編譯器怎么去優化都可以,我只要保證在我操作時其他線程不會操作就可以了,就如下圖所示:
他就是值當前操作前面的代碼是可以優化到后面去的,后面的代碼也是可以優化到前面去的,但是當前不確定性就增加了,效率卻是最高的。
#include <atomic>
#include <thread>
#include <iostream>std::atomic<int> x{0};void thread_func1()
{for (int i = 0; i < 100000; ++i){x.store(i, std::memory_order_relaxed);}
}void thread_func2()
{for (int i = 0; i < 100000; ++i){x.store(-i, std::memory_order_relaxed);}
}int main()
{std::thread t1(thread_func1);std::thread t2(thread_func2);t1.join();t2.join();std::cout << "Final value of x = " << x.load(std::memory_order_relaxed) << std::endl;return 0;
}
這段代碼我們就可以確定,結果要么就是 -9999 要么就是 9999 ,不會存在其他的值,只是線程的調度順序而已,因為當前操作是具有原子性的。
memory_order_release
memory_order_release 代表的是一個釋放操作,在寫入某原子對象時,當前線程的任何前面的讀寫操作都不允許重排到這個操作的后面去,并且當前線程的所有內存寫入都在對同一個原子對象進行獲取的其他線程可見;通常與 memory_order_acquire 或memory_order_consume 配對使用。
也就是說當前操作前面的操作可以任意優化,后面的操作也可以優化到前面來,但是當前操作前面的操作是不可以優化到后面去的,而且當前操作完成以后,數據同時會被刷新到 cache 和主存當中去,其他線程所讀取的就會是最新的數據。
memory_order_acquire
memory_order_acquire 是一種獲得操作,在讀取某原子對象時,當前線程的任何后面的讀寫操作都不允許重排到這個操作的前面去,并且其他線程在對同一個原子對象釋放之前的所有內存寫入都在當前線程可見。
也就是說當前操作前面的操作是可以優化到后面去的,但是當前操作后面的操作不可以優化到前面去,我們會先從主存當中去讀取數據,讀值緩存當中,我們每一次讀取到的數據都是最新的,通常與 memory_order_release 一起使用。
#include <atomic>
#include <thread>
#include <assert.h>
#include <iostream>std::atomic<bool> x,y;
std::atomic<int> z;// 提升效率
void write_x_then_y()
{x.store(true,std::memory_order_relaxed); // 1 y.store(true,std::memory_order_release); // 2
}void read_y_then_x()
{while(!y.load(std::memory_order_acquire)); // 3 自旋,等待y被設置為trueif(x.load(std::memory_order_relaxed)) // 4++z; // 會不會一定等于 1
}int main()
{x=false;y=false;z=0;std::thread a(write_x_then_y);std::thread b(read_y_then_x);a.join();b.join();std::cout << z.load(std::memory_order_relaxed) << std::endl;return 0;
}
我們看上面這段代碼,打印的結果一定是 1 ,原因就在于 3 操作必須等 2 操作完成以后才會結束,對于 3 操作來說,不會允許后面的操作被優化到她的前面,而對于 2 操作來說,不會允許它前面的操作被優化到后面去,也就是說完成 2 操作以后,1 操作必定被完成了,最終只可能是 1 。
memory_order_acq_rel
memory_order_acq_rel 是一個獲得釋放操作,一個讀‐修改‐寫操作同時具有獲得語義和釋放語義,即它前后的任何讀寫操作都不允許重排,并且其他線程在對同一個原子對象釋放之前的所有內存寫入都在當前線程可見,當前線程的所有內存寫入都在對同一個原子對象進行獲取的其他線程可見。
memory_order_seq_cst
順序一致性語義,對于讀操作相當于獲得,對于寫操作相當于釋放,對于讀‐修改‐寫操作相當于獲得釋放,是所有原子操作的默認內存序,并且會對所有使用此模型的原子操作建立一個全局順序,保證了多個原子變量的操作在所有線程里觀察到的操作順序相同,當然它是最慢的同步模型。
shared_ptr的實現
在我們的 C++11 中有很多地方會用到 shared_ptr , shared_ptr 很好的解決了我們內存泄漏的問題,它使用的是 RAII 的方法,又引入了引用計數的概念,支持多個對象去管理一個資源,引用計數控制著當前資源被多少個對象所持有,某個對象被銷毀時引用計數就進行--
操作,當引用計數為 0 時,就意味著該資源需要被釋放了。
shared_ptr 我們也是可以通過原子變量去進行實現的,在 shared_ptr 的內部存在對引用計數的 ++
和 --
操作,就必定需要保證線程安全的問題,就需要加鎖,但是使用原子變量的話可以是更優選,相對于加鎖來說,原子變量的效率更高。
我們要實現一個 shared_ptr ,就需要實現以下接口:
- 構造函數;
- 析構函數;
- 拷貝構造函數;
- 拷貝賦值運算符;
- 移動構造函數;
- 解引用箭頭運算符;
- 引用計數、原始指針、重置指針。
#ifndef __SHARED_PTR__
#define __SHARED_PTR__#include <iostream>
#include <atomic>template <typename T>
class Shared_ptr
{
public:// 智能指針是支持構造一個空對象的Shared_ptr() : ptr_(nullptr), count_(nullptr) {}// 構造函數,explicit修飾,防止被直接賦值explicit Shared_ptr(T *ptr) : ptr_(ptr), count_(ptr ? new std::atomic<std::size_t>(1) : nullptr){}// 析構函數~Shared_ptr(){release();}// 拷貝構造函數Shared_ptr(const Shared_ptr<T> &other) : ptr_(other.ptr_), count_(other.count_){if (count_){count_->fetch_add(1);}}// 賦值運算符重載, 需要注意,防止自己給自己拷貝Shared_ptr<T> &operator=(const Shared_ptr<T> &other){if (this != &other){release();ptr_ = other.ptr_;count_ = other.count_;if (count_){count_->fetch_add(1);}}return this;}// 移動構造函數// 使用 noexcept 修飾,代表當前函數不會存在異常,編譯器會生成高效代碼Shared_ptr(const Shared_ptr<T> &&other) noexcept : ptr_(other.ptr_), count_(other.count_){other.ptr_ = nullptr;other.count_ = nullptr;}// 移動運算符重載Shared_ptr<T> &operator=(const Shared_ptr<T> &&other) noexcept{if (this != &other){release();ptr_ = other.ptr_;count_ = other.count_;other.ptr_ = nullptr;other.count_ = nullptr;}return this;}// 解引用T &operator*() const{return *ptr_;}// ->T *operator->() const{return ptr;}// 獲取到引用計數size_t usecount() const{return count_ ? count_->load() : 0;}// 獲取裸指針T *get() const{return ptr_;}// 重置函數void reset(T *ptr = nullptr){release();ptr_ = ptr;count_ = ptr ? new std::atomic<std::size_t>(1) : 0;}private:void release(){if (count_ && count_->fetch_sub(1) == 1){delete ptr_;delete count_;}}private:std::atomic<int> *count_; // 引用計數T *ptr_;
};#endif
關于智能指針的原理可以參考之前的一篇文章:C++11之智能指針,這兒只是換了另一種方式去進行實現,沒有考慮定制刪除器與弱引用的問題,同樣,上面的操作也沒有對內存序進行設置,默認就是 memory_order_seq_cst,接下來我們來進行一下優化。
代碼中使用 fetch_add 的地方有兩處,這兩處位置使用 memory_order_relaxed 最為合適,因為我們只需要保證這個操作是原子性的即可,保證最終的值不會被影響,而 fetch_sub 使用 memory_order_acq_rel 最為合適,因為我們需要保證當前返回的是操作的原始值,我們必須保證所有操作不被重排,維持我們原始的代碼邏輯,而且我們也需要保證我們的變量是在為 0 時才進行釋放的。
所以最終代碼優化如下:
#ifndef __SHARED_PTR__
#define __SHARED_PTR__#include <iostream>
#include <atomic>template <typename T>
class Shared_ptr
{
public:// 智能指針是支持構造一個空對象的Shared_ptr() : ptr_(nullptr), count_(nullptr) {}// 構造函數,explicit修飾,防止被直接賦值explicit Shared_ptr(T *ptr) : ptr_(ptr), count_(ptr ? new std::atomic<std::size_t>(1) : nullptr){}// 析構函數~Shared_ptr(){release();}// 拷貝構造函數Shared_ptr(const Shared_ptr<T> &other) : ptr_(other.ptr_), count_(other.count_){if (count_){count_->fetch_add(1, std::memory_order_relaxed);}}// 賦值運算符重載, 需要注意,防止自己給自己拷貝Shared_ptr<T> &operator=(const Shared_ptr<T> &other){if (this != &other){release();ptr_ = other.ptr_;count_ = other.count_;if (count_){count_->fetch_add(1, std::memory_order_relaxed);}}return *this;}// 移動構造函數// 使用 noexcept 修飾,代表當前函數不會存在異常,編譯器會生成高效代碼Shared_ptr<T>(const Shared_ptr<T> &&other) noexcept : ptr_(other.ptr_), count_(other.count_){other.ptr_ = nullptr;other.count_ = nullptr;}// 移動運算符重載Shared_ptr<T> &operator=(const Shared_ptr<T> &&other) noexcept{if (this != &other){release();ptr_ = other.ptr_;count_ = other.count_;other.ptr_ = nullptr;other.count_ = nullptr;}return *this;}// 解引用T &operator*() const{return *ptr_;}// ->T *operator->() const{return ptr_;}// 獲取到引用計數size_t usecount() const{return count_ ? count_->load(std::memory_order_acquire) : 0;}// 獲取裸指針T *get() const{return ptr_;}// 重置函數void reset(T *ptr = nullptr){release();ptr_ = ptr;count_ = ptr ? new std::atomic<std::size_t>(1) : 0;}private:void release(){if (count_ && count_->fetch_sub(1, std::memory_order_acq_rel) == 1){delete ptr_;delete count_;}}private:std::atomic<std::size_t> *count_; // 引用計數T *ptr_;
};#endif
測試是否是線程安全的:
#include <iostream>
#include "shared_ptr.h"
#include <thread>
#include <vector>
#include <chrono>
#include <memory>void test_shared_ptr_thread_safety() {Shared_ptr<int> ptr(new int(42));// 創建多個線程,每個線程都增加和減少引用計數const int num_threads = 10;std::vector<std::thread> threads;for (int i = 0; i < num_threads; ++i) {threads.emplace_back([&ptr]() {for (int j = 0; j < 1000; ++j) {Shared_ptr<int> local_ptr(ptr);// 短暫暫停,增加線程切換的可能性std::this_thread::sleep_for(std::chrono::milliseconds(1));}});}// 等待所有線程完成for (auto& thread : threads) {thread.join();}// 檢查引用計數是否正確std::cout << "use_count: " << ptr.usecount() << std::endl;if (ptr.usecount() == 1) {std::cout << "Test passed: shared_ptr is thread-safe!" << std::endl;} else {std::cout << "Test failed: shared_ptr is not thread-safe!" << std::endl;}
}// 測試代碼
int main() {Shared_ptr<int> ptr1(new int(10));std::cout << "ptr1 use_count: " << ptr1.usecount() << std::endl; // 1{Shared_ptr<int> ptr2 = ptr1;std::cout << "ptr1 use_count: " << ptr1.usecount() << std::endl; // 2std::cout << "ptr2 use_count: " << ptr2.usecount() << std::endl; // 2}std::cout << "ptr1 use_count: " << ptr1.usecount() << std::endl; // 1Shared_ptr<int> ptr3(new int(20));ptr1 = ptr3;std::cout << "ptr1 use_count: " << ptr1.usecount() << std::endl; // 2std::cout << "ptr3 use_count: " << ptr3.usecount() << std::endl; // 2ptr1.reset();std::cout << "ptr1 use_count: " << ptr1.usecount() << std::endl; // 0std::cout << "ptr3 use_count: " << ptr3.usecount() << std::endl; // 1test_shared_ptr_thread_safety();return 0;
}
最終結果如下: