創建線程(thread)
#include<iostream>
#include<thread>
using namespace std;// 函數fun,接收一個整型參數并在無限循環中打印遞增的值
void fun(int a) {while(1) {cout << ++a << "\n"; // 打印自增后的athis_thread::sleep_for(chrono::microseconds(50)); // 線程休眠50微秒}
}int main() {int a = 0; // 初始化變量a為0thread t1(fun, a); // 創建線程t1,并啟動,執行fun函數,傳入a作為參數cout << t1.get_id() << endl; // 獲取并打印線程t1的ID// t1.detach(); // 線程與主線程分離,獨立運行t1.join(); // 等待線程t1執行完畢后再繼續執行主線程的后續代碼return 0; // 返回0,程序結束
}
互斥量(mutex),原子變量(atomic)
使用mutex互斥量
#include<iostream>
#include<thread>
#include<mutex>
using namespace std;mutex mtx; // 定義互斥量int gg = 0; // 全局變量 gg,作為共享資源void fun() {int t = 1000;while (t--) {mtx.lock(); // 上鎖++gg; // 修改共享資源--gg; // 修改共享資源mtx.unlock(); // 解鎖}
}int main() {thread t1(fun); // 創建線程 t1thread t2(fun); // 創建線程 t2t1.join(); // 等待線程 t1 結束t2.join(); // 等待線程 t2 結束cout << gg; // 輸出共享資源 gg 的值return 0;
}
多個鎖的情況
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;mutex mtx1, mtx2; // 定義兩個互斥量int gg = 0;void fun() {int t = 1000;while (t--) {// 同時對兩個互斥量上鎖lock(mtx1, mtx2);++gg; // gg自增--gg; // gg自減mtx1.unlock(); // 解鎖第一個互斥量mtx2.unlock(); // 解鎖第二個互斥量}
}int main() {thread t1(fun); // 創建線程t1thread t2(fun); // 創建線程t2t1.join(); // 等待線程t1結束t2.join(); // 等待線程t2結束cout << gg; // 輸出gg的值return 0;
}
實際開發中不會直接使用互斥量,而是搭配模板lock_guard使用,或者搭配功能更多的模板unique_lock使用
#include <iostream>
#include <thread>
#include <mutex>using namespace std;int gg = 0;
mutex mtx1;
mutex mtx2;void fun() {int t = 1000;while (t--) {lock_guard<mutex> lock1(mtx1); // 使用lock_guard自動管理鎖,作用域結束自動解鎖lock_guard<mutex> lock2(mtx2); // 同時鎖定兩個mutex,確保線程安全性++gg;--gg;}
}int main() {thread t1(fun);thread t2(fun);t1.join();t2.join();cout << gg; // 輸出最終的gg值,理論上應該為0,因為++gg和--gg成對出現return 0;
}
使用原子變量
#include <iostream>
#include <thread>
#include <atomic>
using namespace std;atomic<int> gg = 0; // 原子變量 ggvoid fun() {int t = 1000;while (t--) {++gg; // 原子操作:自增--gg; // 原子操作:自減}
}int main() {thread t1(fun); // 創建線程 t1 執行 fun 函數thread t2(fun); // 創建線程 t2 執行 fun 函數t1.join(); // 等待線程 t1 執行完畢t2.join(); // 等待線程 t2 執行完畢cout << gg; // 輸出原子變量 gg 的最終值return 0;
}
條件變量(condition_variable),信號量(semaphore)
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
using namespace std;mutex mtx; // 互斥量,用于保護共享資源
queue<int> q; // 共享隊列,生產者向其中推送數據,消費者從中取出數據
condition_variable cv; // 條件變量,用于線程之間的同步void producer() {int i = 0;while (true) {unique_lock<mutex> lock(mtx); // 加鎖,保護共享資源q.push(i); // 向隊列中推送數據cout << "push: " << i << endl; // 打印推送的數據cv.notify_one(); // 喚醒一個等待的消費者線程// cv.notify_all(); // 喚醒所有等待的消費者線程if (i < 9999)++i;elsei = 0;}
}void consumer() {int data = 0;while (true) {unique_lock<mutex> lock(mtx); // 加鎖,保護共享資源while (q.empty())cv.wait(lock); // 等待直到隊列非空,解鎖互斥量并等待條件變量通知后重新加鎖data = q.front(); // 獲取隊列頭部數據q.pop(); // 彈出隊列頭部數據cout << "pop: " << data << '\n';// 打印彈出的數據}
}int main() {thread t1(producer); // 創建生產者線程thread t2(consumer); // 創建消費者線程t1.join(); // 等待生產者線程結束t2.join(); // 等待消費者線程結束return 0;
}
信號量(semaphore)只在C++20之后的標準有(了解)
#include <iostream>
#include <thread>
#include <semaphore> // 包含信號量的頭文件
using namespace std;counting_semaphore<3> csem(0); // 定義一個初始計數為0的計數信號量,最多允許3個線程同時訪問
binary_semaphore bsem(0); // 定義一個初始計數為0的二進制信號量,相當于一次只允許一個線程訪問void task() {cout << "線程開始等待信號量\n";csem.acquire(); // 線程等待并獲取計數信號量cout << "線程獲取到信號量,繼續執行\n";
}int main() {thread t0(task); // 創建線程 t0 執行 task 函數thread t1(task); // 創建線程 t1 執行 task 函數thread t2(task); // 創建線程 t2 執行 task 函數thread t3(task); // 創建線程 t3 執行 task 函數thread t4(task); // 創建線程 t4 執行 task 函數cout << "主線程釋放信號量\n";csem.release(2); // 主線程釋放2個信號量,喚醒等待的線程,因為初始設置是0,也就是要喚醒兩個線程t0.join(); // 等待線程 t0 執行完畢t1.join(); // 等待線程 t1 執行完畢t2.join(); // 等待線程 t2 執行完畢t3.join(); // 等待線程 t3 執行完畢t4.join(); // 等待線程 t4 執行完畢return 0;
}
promise future
#include <future>
#include <iostream>
#include <thread>
using namespace std;// 定義一個任務函數,接受一個整數參數和一個promise引用
void task(int a, promise<int> &r)
{// 將計算結果設置到promise中r.set_value(a + a);
}int main()
{// 創建一個promise對象,用于在任務函數中設置值promise<int> p;// 從promise中獲取future對象,用于獲取任務函數的返回值future<int> f = p.get_future();// 創建一個線程,執行任務函數task,并傳遞參數1和promise對象p的引用thread t(task, 1, ref(p));/*在此處可以進行其他操作*/// 輸出future對象的值,注意:future的get方法只能調用一次cout << f.get();/*如果需要多次訪問future對象的值,可以使用shared_futureshared_future<int> s_f = f.share();這樣可以直接值傳遞,而不是引用傳遞*/// 等待線程執行完成t.join();return 0;
}
std::packaged_task std::async
async
#include <future>
#include <iostream>
using namespace std;// 定義一個函數,計算兩個整數的和
int task(int a, int b)
{return a + b;
}int main()
{// 創建一個 future 對象,用 async 異步調用 task 函數,并傳入參數 1 和 2future<int> fu = async(task, 1, 2);// 相當于 future<int> fu = async(launch::async|launch::deferred, task, 1, 2);// launch::async 會啟動一個新線程執行任務// launch::deferred 會延遲調用任務,在需要獲取結果時才調用// launch::async|launch::deferred 根據具體情況自動選擇// 輸出 future 對象的結果,使用 get() 函數獲取異步任務的返回值cout << fu.get();return 0;
}
packaged_task
#include <future>
#include <iostream>
using namespace std;// 定義一個任務,計算兩個整數的和
int task(int a, int b)
{return a + b;
}int main()
{// 創建一個打包任務,將函數 task 綁定到 packaged_taskpackaged_task<int(int, int)> t(task);// 執行任務,傳入參數 1 和 2t(1, 2);// 獲取任務的未來對象,并獲取結果cout << t.get_future().get();return 0;
}
bind
#include <future>
#include <iostream>
using namespace std;// 定義一個普通函數,返回兩個整數的和
int task(int a, int b)
{return a + b;
}int main()
{// 使用bind將函數task綁定到a,返回一個std::function對象auto a = bind(task, 1, 2); // 返回的是std::function// 調用a,計算綁定的函數結果int ret = a();cout << ret << endl;// 使用packaged_task封裝a,packaged_task是一個可調用對象的包裝器packaged_task<int()> t(a);t(); // 執行packaged_task,實際調用綁定的函數task// 獲取packaged_task的future,等待任務完成并獲取結果cout << t.get_future().get(); // 輸出任務的結果return 0;
}
異步線程池
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
#include <iostream>// 建議使用支持C++14以上的編譯器以支持所有特性
class ThreadPool {
public:// 構造函數, 初始化大小ThreadPool(size_t);// typename std::result<F(Args...)> -> 編譯期推斷返回類型// 可以使用auto代替,自動推斷返回類型template<class F, class... Args>auto enqueue(F &&f, Args &&... args)-> std::future<typename std::result_of<F(Args...)>::type>;// 析構函數~ThreadPool();private:// need to keep track of threads so we can join them// 線程池的工作線程數std::vector<std::thread> workers;// the task queue// 任務隊列 函數應該被包裝為void(void) 的taskstd::queue<std::function<void()> > tasks;// synchronization// 同步工具// 互斥鎖和條件變量// stop變量檢測是否關閉線程池,可以使用atomic<bool>代替std::mutex queue_mutex;std::condition_variable condition;bool stop;
};// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads): stop(false) {// 創建threads個新線程塞進線程池// 使用std::move 右值傳遞for (size_t i = 0; i < threads; ++i)workers.emplace_back( // 相當于 push_back(std::move(...))[this] { // lambda函數,將class的成員變量以指針(引用)形式傳遞進去,*this則是以拷貝形式傳遞for (;;) {// worker函數不斷輪詢,競爭任務// 創建一個任務包裝,以存放將要完成的taskstd::function<void()> task;{// 訪問臨界區需要上鎖std::unique_lock<std::mutex> lock(this->queue_mutex);// 若隊列不為空或者需要stop,則喚醒workerthis->condition.wait(lock,[this] { return this->stop || !this->tasks.empty(); });// 若有停止信號或者隊列為空,則直接返回if (this->stop && this->tasks.empty())return;// 獲取任務,使用右值task = std::move(this->tasks.front());// 彈出在工作的任務this->tasks.pop();}// 執行任務task();// 完成后繼續從task隊列中提取任務}});
}// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F &&f, Args &&... args)
// 下面的推斷可以使用auto
-> std::future<typename std::result_of<F(Args...)>::type> {// 使用萃取的方法獲取返回值類型using return_type = typename std::result_of<F(Args...)>::type;// 將任務包裝成異步函數指針,封裝為shared_ptr 完后后自動回收,不造成內存泄漏// 而且在后續的lambda函數中可以直接傳遞函數指針然后執行// 使用packaged_task<>,函數綁定std::bind,和完美轉發std::forward// 包裝需要執行的函數,然后在后臺進行異步執行auto task = std::make_shared<std::packaged_task<return_type()> >(std::bind(std::forward<F>(f), std::forward<Args>(args)...));// 綁定異步函數task的返回值到future res中std::future<return_type> res = task->get_future();{// 在匿名作用域中使用unique_lock// 減小鎖的粒度,出了匿名作用區鎖就被釋放std::unique_lock<std::mutex> lock(queue_mutex);// don't allow enqueueing after stopping the pool// 防止在停止后放入任務if (stop)throw std::runtime_error("enqueue on stopped ThreadPool");// 將匿名函數包裝到lambda函數void()中// task是函數指針(即函數的地址),所以拷貝傳遞可以執行tasks.emplace([task]() { (*task)(); });}// 喚醒一個workercondition.notify_one();return res;
}// the destructor joins all threads
inline ThreadPool::~ThreadPool() {{// 此處使用atomic<bool>顯得更加方便std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();// join后自動銷毀回收for (std::thread &worker: workers)worker.join();
}int main() {ThreadPool pool(4);std::vector<std::future<int> > results;results.reserve(8);for (int i = 0; i < 8; ++i) {results.emplace_back(pool.enqueue([i] {std::cout << "hello " << i << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "world " << i << std::endl;return i * i;}));}for (auto &&result: results)std::cout << result.get() << ' ';std::cout << std::endl;return 0;
}