異步編程的概念
什么是異步?
- 異步編程是一種編程范式,允許程序在等待某些操作時繼續執行其它任務,而不是阻塞或等待這些操作完成。
異步編程vs同步編程?
- 在傳統的同步編程中,代碼按順序同步執行,每個操作需要等待前一個操作完成。這種方式在處理I/O操作、網絡請求、計算密集型任務時可能會導致程序的性能瓶頸。舉個例子,當程序需要從網上獲取數據時,需要等待數據返回后才能繼續執行,等待期間CPU可能處于空閑狀態,浪費了資源。
- 而異步編程不同,它允許程序在操作未完成期間繼續執行其它任務。例如,程序可以發起一個網絡請求,然后繼續執行其它操作,等到網絡請求完成時,再處理獲取的數據。
異步編程的優點?
- 提高性能,并發執行多個任務,更高效地利用CPU資源
- 提高響應速度,程序在等待某些操作完成時繼續響應用戶輸入,提高用戶體驗
實現異步編程
? ? ? ? 在介紹異步編程工具之前,我們先來回答一個問題:
為什么主線程無法直接獲取新線程的計算結果?
內存隔離性:線程擁有獨立的棧空間,新線程中局部變量的生命周期僅限于該線程。來看一下下面這個例子:
int res; // 定義在主線程中
// 引用傳遞res
thread t([&res]() {res = 42;
});
t.join();
cout << res << endl; // 可以輸出42,但存在風險!
這種方式看似可行,實則存在嚴重問題:
- 競態條件:如果沒有正確使用join,主線程可能會在t修改res執行前就讀取它
- 懸掛引用:如果res是局部變量且線程未及時完成,新線程可能訪問已銷毀的內存。
同步缺失問題:及時使用全局變量或堆內存,仍需手動同步:
mutex mtx;
int* res = new int(0); // 在堆上創建變量
thread t([&mtx, res](){unique_lock<mutex> lock(mtx);*res = 42;
});
// 主線程需要輪詢檢查res,效率極低
核心機制:std::future
? ? ? ? std::future提供了一種標準化的線程間通信機制,其核心原理是共享狀態,當異步任務完成后,結果會被寫入共享狀態,future通過檢查該狀態安全地傳遞結果。? ? ? ??
????????std::future是一個模板類,用于表示異步操作的結果,允許開發者在未來的某個時刻查詢異步操作的狀態、等待操作完成或獲取操作結果。通常我們不直接創建future對象 ,而是與std::async、std::packaged_task或std::promise配合使用。
任務啟動器:std::async
std::async是一種將任務與std::future關聯的簡單方法,創建并運行一個異步任務,并返回一個與該任務結果關聯的std::future對象。async的任務是否同步運行取決于傳遞的參數:
- std::launch::deferred:表明該函數會被延遲調用,知道future對象上調用get或wait方法才會開始執行任務
- std::launch::async:表明函數會在創建的新線程上運行
- std::launch::deferred|std::launch::async:內部由操作系統自動選擇策略
延遲調用:
#include <iostream>
#include <future>
#include <unistd.h>using std::cout;
using std::endl;int myadd(int num1, int num2)
{cout << "add start!" << endl;return num1 + num2;
}
int main()
{cout << "---------1----------" << endl;std::future<int> fut = std::async(std::launch::deferred, myadd, 1, 2);sleep(1);cout << "---------2----------" << endl;cout << fut.get() << endl;return 0;
}
執行結果:
不難發現,直到我們調用了get方法,才執行myadd函數
異步執行:
#include <iostream>
#include <future>
#include <unistd.h>using std::cout;
using std::endl;int myadd(int num1, int num2)
{cout << "add start!" << endl;return num1 + num2;
}
int main()
{cout << "---------1----------" << endl;std::future<int> fut = std::async(std::launch::async, myadd, 1, 2);sleep(1);cout << "---------2----------" << endl;cout << fut.get() << endl;return 0;
}
執行結果:
在調用之后創建的線程立即執行了myadd函數。
結果傳遞器:std::promise
????????std::promise是一個用于設置異步操作結果的機制。允許我們在一個線程中設置值或異常,然后再另一個線程中通過future對象檢索這些值或異常,通常與std::async、std::thread等結合使用,在異步操作中傳遞結果。
#include <iostream>
#include <thread>
#include <future>//通過在線程中對promise對象設置數據,其他線程中通過future獲取設置數據的方式實現獲取異步任務執行結果的功能
void Add(int num1, int num2, std::promise<int> &prom) {std::this_thread::sleep_for(std::chrono::seconds(3));prom.set_value(num1 + num2);return ;
}int main()
{std::promise<int> prom;std::future<int> fu = prom.get_future();std::thread thr(Add, 11, 22, std::ref(prom));int res = fu.get();std::cout << "sum: " << res << std::endl;thr.join();return 0;
}
注意事項
- std::promise的生命周期:需要確保promise對象在future對象需要使用它的時候保持有效,一旦promise對象銷毀,任何嘗試通過future訪問其結果的操作都將失敗。
- 線程安全:std::promise的set_value和set_exception方法是線程安全的,但仍應該避免在多個線程中同時調用它們,這意味著設計存在問題。
- 將std::promise對象傳給線程函數時,通常使用std::move或std::ref來避免不必要的復制。、
任務封裝器:std::packaged_task
std::packaged_task是一個模板類,主要用于將一個可調用對象包裝起來,以便異步執行,并能夠獲取其返回結果。它和std::future、std::thread緊密相關,常用于多線程編程中。
使用std::packaged_task的流程:
- 創建packaged_task對象:創建packaged_task對象需要傳遞一個可調用對象,將其封裝為異步任務。
- 獲取future對象:使用get_future方法可以獲取與packaged_task關聯的future對象,用于獲取異步操作的結果。
- 執行任務:通過operator()或調用thread在一個新線程中執行。注意packed_task對象是不能復制的,所以需要通過std::move或智能指針傳遞。
- 獲取結果:主線程種調用future對象的get方法可以等待異步任務完成并獲取其返回值。如果任務尚未完成,get方法會阻塞直到結果可用。
#include <iostream>
#include <thread>
#include <future>
#include <memory>
//pakcaged_task的使用
// pakcaged_task 是一個模板類,實例化的對象可以對一個函數進行二次封裝,
//pakcaged_task可以通過get_future獲取一個future對象,來獲取封裝的這個函數的異步執行結果int Add(int num1, int num2) {std::this_thread::sleep_for(std::chrono::seconds(3));return num1 + num2;
}int main()
{//std::packaged_task<int(int,int)> task(Add);//std::future<int> fu = task.get_future();//task(11, 22); task可以當作一個可調用對象來調用執行任務//但是它又不能完全的當作一個函數來使用//std::async(std::launch::async, task, 11, 22);//std::thread thr(task, 11, 22);//但是我們可以把task定義成為一個指針,傳遞到線程中,然后進行解引用執行//但是如果單純指針指向一個對象,存在生命周期的問題,很有可能出現風險//思想就是在堆上new對象,用智能指針管理它的生命周期auto ptask = std::make_shared<std::packaged_task<int(int,int)>>(Add);std::future<int> fu = ptask->get_future();std::thread thr([ptask](){(*ptask)(11, 22);});int sum = fu.get();std::cout << sum << std::endl;thr.join();return 0;
}
三種異步工具的比較
std::async
- 自動任務調度:async提供了一種簡單方便地方式來創建異步任務,只需要調用async,傳入函數和參數,就會自動執行異步任務,并返回future對象用于得到異步操作結果。
- 靈活性有限:盡管簡單,但靈活性有限,無法完全控制任務的調度方式(如任務在哪個線程運行)
- 適用場景:適用于簡單的異步任務,不需要復雜的任務調度和管理。
std::promise
- 手動設置結果:promise是一種更底層的機制,允許手動設置異步操作的結果,并將結果傳遞給與之關聯的future對象,使用時需要將promise和異步任務的邏輯結合在一起。
- 更多的代碼管理:使用promise需要手動管理任務的執行和結果的傳遞,因此比async更靈活和復雜。
- 使用場景:適用于需要手動控制任務結果傳遞的場景,或異步任務的結果是由多個步驟或線程決定的。
std::packaged_task
- 封裝可調用對象:packaged_task可以將一個可調用對象封裝起來,并通過future對象傳遞執行結果,這使它可以用于復雜的異步任務調度。
- 與其他工具結合使用:packaged_task的設計使得它可以很容易地與std::thread、自定義線程池、任務隊列等結合使用,靈活地管理任務的執行。
- 使用場景:適合需要高度靈活的任務管理、封裝任務并手動控制任務執行的場景,特別適用于實現自定義線程池。
異步線程池設計方案
線程池需要管理的數據:
- 控制線程停止的變量:支持原子操作,保證關閉線程池操作的線程安全
- 任務池:存放待執行的異步任務
- 互斥鎖與條件變量:保證線程安全與同步
- 一批工作線程:用于執行異步任務
線程池的實現思路:
- 在啟動時預先創建一批工作線程,執行線程入口函數:不斷從任務池中取出任務進行執行,沒有任務則等待條件變量就緒
- 用戶通過Push方法可以將要執行的任務傳入線程池,先將傳入的任務封裝為packaged_task異步任務后,通過packaged_task的get_future方法可以獲得future對象,然后將異步任務放入任務池,喚醒工作線程執行異步任務。
- 將future對象返回給使用者,使用者可以通過get方法獲取異步任務的執行結果。
#include <vector>
#include <iostream>
#include <thread>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <memory>using namespace std;class ThreadPool
{
public:using Functor = function<void()>;ThreadPool(int threadNum = 1): _stop(false){// 創建一批執行線程入口函數的線程for(int i = 0; i < threadNum; i++){_threads.emplace_back(&ThreadPool::entry, this);}}// 萬能引用template<typename F, typename ...Args>auto Push(F&& func, Args&& ...args) -> future<decltype(func(args...))> // 編譯時推導返回值類型{using return_type = decltype(func(args...));// 完美轉發auto tmp_func = bind(forward<F>(func), forward<Args>(args)...);auto task = make_shared<packaged_task<return_type()>>(tmp_func);future<return_type> fut = task->get_future();{unique_lock<mutex> lock(_mtx);_tasks.push_back([task](){(*task)();});}_cv.notify_one();return fut;}~ThreadPool(){Stop();}void Stop(){if(_stop == true) return ;_stop = true;_cv.notify_all(); // 喚醒所有線程,進行回收for(auto& thread : _threads){if(thread.joinable()){thread.join(); // 回收線程}}}
private:// 不斷從任務池中取出任務執行void entry(){while(!_stop){vector<Functor> tmp_tasks;{unique_lock<mutex> lock(_mtx);_cv.wait(lock, [this](){return _stop || !_tasks.empty(); // 當線程池停止(要回收線程)或任務池有任務時喚醒線程});tmp_tasks.swap(_tasks);}for(auto& task : tmp_tasks){task();}}}
private:atomic<bool> _stop;vector<Functor> _tasks;vector<thread> _threads;mutex _mtx;condition_variable _cv;
};int add(int a, int b)
{return a + b;
}
int main()
{ThreadPool pool;for(int i = 0; i < 10; i++){future<int> fut = pool.Push(add, 10, i);cout << fut.get() << endl;}return 0;
}