C++多線程推理、生產者消費者模式封裝
tensorRT從零起步邁向高性能工業級部署(就業導向) 課程筆記,講師講的不錯,可以去看原視頻支持下。
深度學習推理中的多線程知識概覽
- 本章介紹的多線程主要是指算法部署時所涉及的多線程內容,對于其他多線程知識需要自行補充
- 常用組件有 thread、mutex、future、condition_variable
- 啟動線程,thread,以及 join、joinable、detach、類函數啟動為線程
- 生產者-消費者模式
- 具體問題:隊列溢出的問題:生產太快,消費太慢;如何實現溢出控制
- 具體問題:生產者如何拿到消費反饋
- RAII 思想的生產者-消費者模式封裝,多 batch 的體現
thread、join、joinable、detach、常規/引用傳參、類函數
#include <thread>
#include <stdio.h>using namespace std;void worker() {printf("Hello World.\n");
}int main() {thread t(worker);// thread t;t.join();printf("Done.\n");return 0;
}
上面是一個最簡單的 cpp 多線程的程序
-
t.join()
等待線程結束,如果不加,就會在析構時提示異常,出現 core dumped,只要線程 t 啟動了(如果只是聲明thread t;
不算啟動),就必須要 join。 -
若 t 沒有啟動線程,如果 join ,也會 core dumped 異常;
-
根據以上兩點,如果我們在某些條件下啟動線程,某些條件下不啟動,該怎么辦呢? 用 joinable,如:
if (t.joinable()) t.join();
-
detach 分離線程,取消管理權,使得線程稱為野線程,不建議使用。野線程不需要 join,線程交給系統管理,程序退出后,所有線程才退出。
-
基本傳參:
void worker(int a) {printf("Hello Thread, %d\n", a); }int main() {thread t(worker, 12);if (t.joinable()) t.join();printf("Done.\n");return 0; }
-
引用傳參:
void worker(string& s) {printf("Hello Thread\n");s = "reference string"; }int main() {string param;thread t(worker, 12, std::ref(param));// thread t(worker, 12, param); 錯誤的引用傳參if (t.joinable()) t.join();printf("Done.\n");cout << param << endl;return 0; }
多線程的引用傳參有兩點需要注意:
- 傳入時需要使用
std::ref(param)
- 注意引用變量的聲明周期,如果在外面聲明的引用變量傳給子線程,而在子線程結束之前就在外面將變量釋放掉了,則在子線程中可能引發錯誤
- 傳入時需要使用
-
類的線程啟動
注釋掉的方式是用類的靜態方法的方式,不建議
class Infer { public:Infer() {// worker_thread_ = thread(infer_worker, this);worker_thread_ = thread(&Infer::infer_worker, this);}private:thread worker_thread_;// static infer_worker(Infer* self) { /* ... */ }void infer_worker() { /* ... */ } };
圖像處理的生產者消費者模式
首先看一個最簡單的生產者消費者模式,兩個線程分別執行 video_capture
和 infer_worker
兩個函數來生產(獲取)圖片和推理圖片。
其中 queue<string> qjobs_;
用于存儲待處理的圖片
#include <thread>
#include <queue>
#include <mutex>
#include <string>
#include <stdio.h>
#include <chrono>using namespace std;queue<string> qjobs_;
int get_image_time = 1000; // 先假設獲取一張圖片與推理一張圖片都是一秒
int infer_image_time = 1000;void video_capture() {int pic_id = 0;while (true) {char name[100];sprintf(name, "PIC-%d", pic_id++);printf("生產了一張新圖片: %s\n", name);qjobs_.push(name);this_thread::sleep_for(chrono::milliseconds(get_image_time));}
}void infer_worker() {while (true) {if (!qjobs_.empty()) {auto pic = qjobs_.front();qjobs_.pop();printf("消費掉一張圖片: %s\n", pic.c_str());this_thread::sleep_for(chrono::milliseconds(infer_image_time));}this_thread::yield(); // 沒有要處理的圖片,主動交出CPU,避免資源浪費}
}int main() {thread t0(video_capture);thread t1(infer_worker);t0.join();t1.join();return 0;
}
基本問題
共享資源訪問的問題
stl 中的 queue 隊列不是 thread-safe 的,我們需要自己加鎖來保證共享資源訪問的安全性
只需要將訪問共享變量的代碼部分用鎖保護起來即可:
mutex lock_;void video_capture() {int pic_id = 0;while (true) {{lock_guard<mutex> l(lock_);char name[100];sprintf(name, "PIC-%d", pic_id++);printf("生產了一張新圖片: %s\n", name);qjobs_.push(name);}this_thread::sleep_for(chrono::milliseconds(get_image_time));}
}void infer_worker() {while (true) {if (!qjobs_.empty()) {{lock_guard<mutex> l(lock_);auto pic = qjobs_.front();qjobs_.pop();printf("消費掉一張圖片: %s\n", pic.c_str());}this_thread::sleep_for(chrono::milliseconds(infer_image_time));}this_thread::yield(); // 沒有要處理的圖片,主動交出CPU,避免資源浪費}
}
問題1
隊列溢出的問題,生產太快,消費太慢;如何實現溢出控制
之前我們設定的是生產與消費均為一秒,但是若生產速率高于消費速率,則必然會出現隊列堆積現象。
解決方法:使用條件變量 condation_variable :如果隊列滿了,就不生產,等待隊列有空間,再生產,即我們要達成類似如下的邏輯:
if (qjobs_.size() < limit) wait();
qjobs_.push(name);
這就又有另一個問題,如何在隊列有空間時,通知 wait() 函數停止等待,實際上這可以在消費者的函數中進行,因為當我們消費掉隊列中的一張圖片,隊列肯定就有空間來存放新的圖片了。
完整的加 wait 的代碼:
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <string>
#include <stdio.h>
#include <chrono>using namespace std;queue<string> qjobs_;
mutex lock_;
condition_variable cv_;
int get_image_time_ = 300; // 先假設獲取一張圖片與推理一張圖片都是一秒
int infer_image_time_ = 1000;
const int limit_ = 5;void video_capture() {int pic_id = 0;while (true) {{unique_lock<mutex> l(lock_);char name[100];sprintf(name, "PIC-%d", pic_id++);printf("生產了一張新圖片: %s, 當前隊列大小: %d\n", name, (int)qjobs_.size());qjobs_.push(name);// condition_variable.wait(lock, predicate);// predicate 指定什么時候等待,什么時候停止等待cv_.wait(l, [&](){// return false 表示繼續等待; return true 表示停止等待return qjobs_.size() <= limit_;});}this_thread::sleep_for(chrono::milliseconds(get_image_time_));}
}void infer_worker() {while (true) {if (!qjobs_.empty()) {{lock_guard<mutex> l(lock_);auto pic = qjobs_.front();qjobs_.pop();printf("消費掉一張圖片: %s\n", pic.c_str());// 消費掉一個,就可以通知wait,停止等待cv_.notify_one();}this_thread::sleep_for(chrono::milliseconds(infer_image_time_));}this_thread::yield(); // 沒有要處理的圖片,主動交出CPU,避免資源浪費}
}int main() {thread t0(video_capture);thread t1(infer_worker);t0.join();t1.join();return 0;
}
測試可以看到,在達到我們設置的隊列上限之后,不會再一直生產新圖片導致隊列溢出:
生產了一張新圖片: PIC-0, 當前隊列大小: 0
消費掉一張圖片: PIC-0
生產了一張新圖片: PIC-1, 當前隊列大小: 0
生產了一張新圖片: PIC-2, 當前隊列大小: 1
生產了一張新圖片: PIC-3, 當前隊列大小: 2
消費掉一張圖片: PIC-1
生產了一張新圖片: PIC-4, 當前隊列大小: 2
生產了一張新圖片: PIC-5, 當前隊列大小: 3
生產了一張新圖片: PIC-6, 當前隊列大小: 4
消費掉一張圖片: PIC-2
生產了一張新圖片: PIC-7, 當前隊列大小: 4
生產了一張新圖片: PIC-8, 當前隊列大小: 5
消費掉一張圖片: PIC-3
生產了一張新圖片: PIC-9, 當前隊列大小: 5
消費掉一張圖片: PIC-4
生產了一張新圖片: PIC-10, 當前隊列大小: 5
消費掉一張圖片: PIC-5
生產了一張新圖片: PIC-11, 當前隊列大小: 5
消費掉一張圖片: PIC-6
生產了一張新圖片: PIC-12, 當前隊列大小: 5
消費掉一張圖片: PIC-7
生產了一張新圖片: PIC-13, 當前隊列大小: 5
注意:一旦進入 wait() ,會自動釋放鎖;一旦退出 wait() ,會加鎖。
問題2
生產者如何拿到消費者的反饋
我們消費者將生產者的圖片推理完成之后,肯定要將結果返回給生產者。比如在目標檢測中,video_capture 將捕獲到的圖片交給消費者處理完之后,需要得到物體框的坐標,再將框畫到原圖上進行顯示。那么這時,生產者應該如何拿到消費者的反饋呢?
這就要用到 promise 和 future,下面我們將 job 從單純的 string 輸入改為這樣一個結構體:
struct Job {shared_ptr<promise<string>> pro; // 返回結果,如果在目標檢測的例子中就是框string input; // 輸入,圖片
};
其中:
- input:輸入,還是輸入,實際中可能是圖片,這里還是用 string 代替
- pro:指向 promise 對象的共享指針,用來得到返回的結果
具體過程見下面代碼中的注釋,完整的代碼:
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <stdio.h>
#include <string>
#include <memory>
#include <future>
#include <chrono>using namespace std;struct Job {shared_ptr<promise<string>> pro; // 返回結果,如果在目標檢測的例子中就是框string input; // 輸入,圖片
};queue<Job> qjobs_;
mutex lock_;
condition_variable cv_;
int get_image_time_ = 300; // 先假設獲取一張圖片與推理一張圖片都是一秒
int infer_image_time_ = 1000;
const int limit_ = 5;void video_capture() {int pic_id = 0;while (true) {Job job;{unique_lock<mutex> l(lock_);char name[100];sprintf(name, "PIC-%d", pic_id++);printf("生產了一張新圖片: %s, 當前隊列大小: %d\n", name, (int)qjobs_.size());job.pro.reset(new promise<string> ());job.input = name;qjobs_.push(job);// condition_variable.wait(lock, predicate);// predicate 指定什么時候等待,什么時候停止等待cv_.wait(l, [&](){// return false 表示繼續等待; return true 表示停止等待return qjobs_.size() <= limit_;});}// .get() 實現等待, 直到promise->set_value()被執行了,這里的返回值就是result// 另外要注意,這里等待結果要放在鎖的外面,避免持有鎖等待結果,造成死鎖auto result = job.pro->get_future().get();// 處理resultprintf("Job %s -> %s\n", job.input.c_str(), result.c_str());this_thread::sleep_for(chrono::milliseconds(get_image_time_));}
}void infer_worker() {while (true) {if (!qjobs_.empty()) {{lock_guard<mutex> l(lock_);auto pjob = qjobs_.front();qjobs_.pop();printf("消費掉一張圖片: %s\n", pjob.input.c_str());auto res = pjob.input + " ---- infer result";pjob.pro->set_value(res);// 消費掉一個,就可以通知wait,停止等待cv_.notify_one();}this_thread::sleep_for(chrono::milliseconds(infer_image_time_));}this_thread::yield(); // 沒有要處理的圖片,主動交出CPU,避免資源浪費}
}int main() {thread t0(video_capture);thread t1(infer_worker);t0.join();t1.join();return 0;
}
輸出:
生產了一張新圖片: PIC-0, 當前隊列大小: 0
消費掉一張圖片: PIC-0
Job PIC-0 -> PIC-0 ---- infer result
生產了一張新圖片: PIC-1, 當前隊列大小: 0
消費掉一張圖片: PIC-1
Job PIC-1 -> PIC-1 ---- infer result
生產了一張新圖片: PIC-2, 當前隊列大小: 0
消費掉一張圖片: PIC-2
可以看到結果中能夠拿到對應圖片的推理結果。
RAII+接口模式對模型加載進行單批多圖推理封裝
考慮下面的推理類加載模型和推理的過程:(context_ 來代替模型,實際案例中,模型的加載與釋放比這要復雜的多,這里簡單地用 string 來代替)
class Infer {
public:bool load_model(const string& file) {// 異常邏輯處理if (!context_.empty()) {destory();}// 正常邏輯context_ = file;return true;}void forward() {// 異常邏輯處理if (context_.empty()) {printf("模型尚未加載.\n");return;}// 正常邏輯printf("正在使用 %s 進行推理.\n", context_.c_str());}void destory() {context_.clear();}
private:string context_;};
問題:
正常工作代碼中,異常邏輯的處理(如模型推理前未進行模型加載、推理后未進行模型銷毀等)需要耗費大量時間和代碼量,如果異常邏輯寫的不對,甚至會造成封裝的不安全性,導致程序崩潰。這樣封裝又難寫,又難用。
解決方法:
- RAII:資源獲取即初始化
- 接口模式:設計模式,是一種封裝模式,實現類與接口類分離的模式
我們分別來看這兩種解決方法帶來的好處:
RAII
我們使用這樣一個 create_infer
函數來代替 Infer
類的直接初始化:
shared_ptr<Infer> create_infer(const string& file) {shared_ptr<Infer> instance(new Infer());if (!instance->load_model(file)) instance.reset();return instance;
}int main() {// Infer infer; // 直接獲取類string file = "...";auto infer = create_infer(file); // 通過封裝的函數獲取類if (infer == nullptr) printf("模型加載失敗\n");infer->forward();return 0;
}
RAII 的特點:獲取 infer 實例,即表示加載模型。并且獲取資源與加載模型強綁定,加載模型成功,則表示獲取資源成功,加載模型失敗,則直接表示獲取資源失敗。
好處:
- 避免外部執行 load_model ,只有在 create_infer 中調用,不會有任何另外的地方調用,后面會進一步通過接口模式直接禁止外部執行
- 一個實例的 load_model 不會執行超過一次
- 獲取的模型一定初始化成功,因此 forward 時不必再做判斷
- 僅需在外部做一次 create 是否成功的判斷
- 不需要在 forward 函數、create 函數內再做異常判斷
接口模式
- 解決成員函數(如load_model)外部仍可調用的問題,我們之前說過,要保證它只在 create_infer 中調用
- 解決成員變量(如context_) 對外可見的問題
- 注意:這里的 context_ 雖然是 private 變量不可訪問,但是是對外可見的。對外可見可能造成的問題是:特殊的成員變量類型對頭文件的依賴,從而造成的命名空間污染/頭文件污染。比如成員變量是 cudaStream_t 類型,那就必須包含 cuda_runtime.h 頭文件。
- 接口類 (這里的 InferInterface 類) 是一個純虛類,其原則是:**只暴露調用者需要的函數,其他一概不暴露。**比如 load_model 已通過 RAII 封裝到 create_infer 內,這里 load_model 就屬于不需要暴露的類,內部如果有啟動線程如 start、stop 等,也不需要暴露。而 forward 這些函數肯定是需要暴露的。
- 此時,可以將這些聲明與實現分別放到 infer.hpp 和 infer.cpp 中
最終我們的完整代碼有三個文件: infer.hpp, infer.cpp, main.cpp 分別如下:
infer.hpp
// infer.hpp
#ifndef INFER_HPP
#define INFER_HPP#include <memory>
#include <string>class InferInterface {
public:virtual void forward() = 0;
};std::shared_ptr<InferInterface> create_infer(const std::string& file);#endif
infer.cpp
#include "infer.hpp"using namespace std;class InferImpl : public InferInterface {
public:bool load_model(const string& file) {context_ = file;return true;}virtual void forward() override {printf("正在使用 %s 進行推理.\n", context_.c_str());}void destory() {context_.clear();}private:string context_;
};shared_ptr<InferInterface> create_infer(const string& file) {shared_ptr<InferImpl> instance(new InferImpl());if (!instance->load_model(file)) instance.reset();return instance;
}
main.cpp
#include "infer.hpp"using namespace std;int main() {string file = "model a";auto infer = create_infer(file);if (infer == nullptr) {printf("模型加載失敗\n");return -1;}infer->forward();return 0;
}
原則總結:
- 頭文件,盡量只包含需要的部分
- 外界不需要的,盡量不讓外界看到,保持接口的簡潔
- 不要在頭文件中用
using namespace ...
,如果寫了的話,所有包含改頭文件的文件,就都打開了這個命名空間
多圖推理
最終我們給出多圖推理的代碼,同樣是三個文件,關鍵代碼已經給出注釋:
infer.hpp
// infer.hpp
#ifndef INFER_HPP
#define INFER_HPP#include <memory>
#include <string>
#include <future>class InferInterface {
public:virtual std::shared_future<std::string> forward(std::string pic) = 0;
};std::shared_ptr<InferInterface> create_infer(const std::string& file);#endif
infer.cpp
// infer.cpp
#include "infer.hpp"
#include <mutex>
#include <thread>
#include <future>
#include <queue>
#include <string>
#include <memory>
#include <chrono>
#include <condition_variable>using namespace std;struct Job {shared_ptr<promise<string>> pro;string input;
};class InferImpl : public InferInterface {
public:// 析構函數virtual ~InferImpl() {worker_running_ = false;cv_.notify_one();if (worker_thread_.joinable()) worker_thread_.join();}bool load_model(const string& file) {// 盡量保證資源在哪里分配,就在哪里使用,就在哪里釋放,這樣不會太亂。比如這里我們就都在 worker 函數內完成。// 這里的pro表示是否啟動成功promise<bool> pro;worker_running_ = true;worker_thread_ = thread(&InferImpl::worker, this, file, std::ref(pro));return pro.get_future().get();}virtual shared_future<string> forward(string pic) override {// printf("正在使用 %s 進行推理.\n", context_.c_str());Job job;job.pro.reset(new promise<string>());job.input = pic;lock_guard<mutex> l(job_lock_);qjobs_.push(job);// 一旦有任務需要推理,發送通知cv_.notify_one();// return job.pro->get_future().get(); // 不能這樣直接返回模型推理的結果,因為這樣會等待模型推理結束,相當于還是串行return job.pro->get_future(); // 而是直接返回future對象,讓外部按需要再.get()獲取結果}void worker(string file, promise<bool>& pro) {// worker是實際執行推理的函數// context的加載、使用和釋放都在worker內string context = file;if (context.empty()) { // 未初始化,返回falsepro.set_value(false);return;}else { // 已初始化,返回true,之后正式開始進行推理pro.set_value(true);}int max_batch_size = 5;vector<Job> jobs; // 拿多張圖片 batchint batch_id = 0;while (worker_running_) {// 被動等待接收通知unique_lock<mutex> l(job_lock_);cv_.wait(l, [&](){// true:停止等待return !qjobs_.empty() || !worker_running_;});// 如果是因為程序發送終止信號而推出wait的if (!worker_running_) break;// 可以一次拿一批出來, 最大拿maxBatchSize個while (jobs.size() < max_batch_size && !qjobs_.empty()) {jobs.emplace_back(qjobs_.front());qjobs_.pop();}// 執行batch推理for (int i=0; i<jobs.size(); ++i) {auto& job = jobs[i];char name[100];sprintf(name, "%s : batch->%d[%d]", job.input.c_str(), batch_id, (int)jobs.size());job.pro->set_value(name);}batch_id++;jobs.clear();this_thread::sleep_for(chrono::milliseconds(infer_time_));}printf("釋放模型: %s\n", context.c_str());context.clear(); // 釋放模型printf("線程終止\n");}private:atomic<bool> worker_running_{false}; // 表示程序是否正在運行thread worker_thread_;queue<Job> qjobs_;mutex job_lock_;condition_variable cv_;int infer_time_ = 1000;
};shared_ptr<InferInterface> create_infer(const string& file) {shared_ptr<InferImpl> instance(new InferImpl());if (!instance->load_model(file)) instance.reset();return instance;
}
main.cpp
// main.cpp
#include "infer.hpp"using namespace std;
int main() {string file = "model a";auto infer = create_infer(file);if (infer == nullptr) {printf("模型加載失敗\n");return -1;}auto fa = infer->forward("A");auto fb = infer->forward("B");auto fc = infer->forward("C");printf("%s\n", fa.get().c_str());printf("%s\n", fb.get().c_str());printf("%s\n", fc.get().c_str());// auto fa = infer->forward("A").get();// auto fb = infer->forward("B").get();// auto fc = infer->forward("C").get();// printf("%s\n", fa.c_str());// printf("%s\n", fb.c_str());// printf("%s\n", fc.c_str());printf("程序終止\n");return 0;
}
想一下,如果按照注釋掉的部分的方式來進行推理的話,會有什么不同呢?
會每次都等待結果,無法進行單批次多圖處理。