目錄
- 前言
- 1. RAII接口模式封裝生產者消費者
- 2. 問答環節
- 總結
前言
杜老師推出的 tensorRT從零起步高性能部署 課程,之前有看過一遍,但是沒有做筆記,很多東西也忘了。這次重新擼一遍,順便記記筆記。
本次課程學習 tensorRT 高級-RAII 接口模式下的生產者消費者多 batch 實現
課程大綱可看下面的思維導圖
1. RAII接口模式封裝生產者消費者
這節課我們利用上節課學到的 RAII + 接口模式對我們的消費者生產者進行封裝
我們來看代碼
infer.hpp
#ifndef INFER_HPP
#define INFER_HPP#include <memory>
#include <string>
#include <future>class InferInterface{
public:virtual std::shared_future<std::string> forward(std::string pic) = 0;
};std::shared_ptr<InferInterface> create_infer(const std::string& file);#endif // INFER_HPP
infer.cpp
#include "infer.hpp"
#include <thread>
#include <queue>
#include <mutex>
#include <future>using namespace std;struct Job{shared_ptr<promise<string>> pro;string input;
};class InferImpl : public InferInterface{
public:virtual ~InferImpl(){worker_running_ = false;cv_notify_one();if(worker_thread_.joinable())worker_thread_.join();}bool load_model(const string& file){// 盡量保證資源哪里分配哪里釋放,哪里使用,這樣使得程序足夠簡單,而不是太亂// 線程內傳回返回值的問題promise<bool> pro;worker_running_ = true;worker_thread_ = thread(&InferImpl::worker, this, file, std::ref(pro));return pro.get_future().get();}virtual shared_future<string> forward(string pic) override{// printf("使用 %s 進行推理\n", context_.c_str());// 往隊列拋任務Job job;job.pro.reset(new promise<string>());job.input = pic;lock_guard<mutex> l(job_lock_);qjobs_.push(job);// 被動通知,一旦有新的任務需要推理,通知我即可// 發生通知的家伙cv_.notify_one();return job.pro->get_future();}// 實際執行模型推理的部分void worker(string file, promise<bool>& pro){// worker內實現,模型的加載,使用,釋放string context = file;if(context.empty()){pro.set_value(false);return;}else{pro.set_value(true);}int max_batch_size = 5;vector<Job> jobs;int batch_id = 0;while(worker_running_){// 等待接受的家伙// 在隊列取任務并執行的過程unique_lock<mutex> l(job_lock_);cv_.wait(job_lock_, [&](){// true 退出等待// false 繼續等待return !qjobs_.empty() || !worker_running_;});// 程序發送終止信號if(!worker_running_)break;while(jobs.size() < max_batch_size && !qjobs_.empty()){jobs.emplace_back(qjobs_.front());qjobs.pop();}// 可以在這里一次拿一批出來,最大拿 maxbatchsize 個 job 進行一次性處理// jobs inference -> batch inference// 執行 batch 推理for(int i = 0; i < jobs.size(); ++i){auto& job = jobs[i];char result[100];sprintf(result, "%s : batch-> %d[%d]", job.input.c_str(), batch_id, jobs.size());job.pro->set_value(result);}batch_id++;jobs.clear();// 模擬推理耗時this_thread::sleep_for(chrono::milliseconds(1000));}// 釋放模型printf("釋放: %s\n", context.c_str());context.clear();printf("Worker done.\n");}
private:atomic<bool> worker_running_{false};thread worker_thread_;queue<Job> qjobs_;mutex job_lock_;condition_variable cv_;
};shared_ptr<InferInterface> create_infer(const string& file){shared_ptr<InferImpl> instance(new Infer());if(!instance->load_model(file))instance.reset();return instance;
}
main.cpp
#include "infer.hpp"int main(){auto infer = create_infer("a");if(infer == nullptr){printf("failed.\n");return -1;}// 串行// auto fa = infer->forward("A").get();// auto fb = infer->forward("B").get();// auto fc = infer->forward("C").get();// printf("%s\n", fa.c_str());// printf("%s\n", fb.c_str());// printf("%s\n", fc.c_str());// 并行auto fa = infer->forward("A");auto fb = infer->forward("B");auto fc = infer->forward("C");printf("%s\n", fa.get().c_str());printf("%s\n", fb.get().c_str());printf("%s\n", fc.get().c_str()); printf("Program done.\n");return 0;
}
上述示例代碼相對復雜,結合了 RAII 和接口模式來實現模擬模型推理,具體是一個消費者-生產者模式的異步批處理機制,我們來簡單解讀下 infer.cpp 中具體干了些啥(form chatGPT)
1. 數據結構和類定義
- Job 結構體:這是一個任務結構,包含了一個 promise 對象(用于在工作線程中設置結果)和輸入數據,promise 又通過 shared_ptr 封裝了一層,可以讓結構體傳遞效率更高
- InferImpl 類,這是 InferInterface 的實現類,包含了異步處理的核心邏輯
2. InferImpl 類的方法和成員
- 析構函數:在對象銷毀時,將 worker_running_ 標志設置為 false,并通過條件變量喚醒工作線程。然后等待工作線程結束
- load_model 方法:模型加載函數,它實際上啟動了工作線程,并傳遞了一個 promise 對象來設置是否成功加載了模型
- forward 方法:這是暴露給使用者的接口,用于提交一個新的推理任務。這個方法將任務添加到隊列中,并通過條件變量喚醒工作線程
- worker 方法:這是工作線程的核心函數,它從隊列中取出任務并批量處理它們,然后使用 promise 設置結果
- 私有成員:
- worker_running_:一個原子布爾標志,表示工作線程是否正在運行
- worker_thread_:工作線程對象
- qjobs_:包含待處理任務的隊列
- job_lock_:保護任務隊列的互斥鎖
- cv_:條件變量,用于在有新任務到來或工作線程需要停止時喚醒工作線程
3. 工廠函數
- create_infer 函數:RAII 的體現,這個函數創建了一個 InferImpl 的實例,并嘗試加載模型。如果加載失敗,它將返回一個空的智能指針。
這個示例清晰地展示了如何使用 RAII 和接口模式來實現一個異步批處理機制,同時也展示了如何使用 C++11 的并發特性(如 thread、promise、condition_variable 等)來實現這種機制。
2. 問答環節
博主對多線程相關的知識不怎么了解,因此瘋狂詢問 chatGPT,故此做個記錄方便下次查看,以下內容來自于博主和 chatGPT 之間的對話
問題1:work_running_ 為什么是 atomic<boll> 類型,為什么不直接使用 bool 類型?什么是 atomic<bool> 類型?🤔
回答:atomic<bool> 是 C++11 引入的原子類型,它是為了支持多線程環境中無鎖的線程安全操作。使用原子類型可以確保對該類型的讀寫操作是原子的,即它們不會被其他線程中的操作中斷。
當多個線程可能同時訪問和修改一個變量時,直接使用基本數據類型(如 bool)可能會導致不確定的行為或數據競態(race conditions)。數據競態通常是多線程編程中的一個問題,當兩個或更多的線程同時訪問同一資源,并至少有一個線程修改該資源時,結果是不確定的。
使用 atomic<bool> 的好處:
1. 原子操作:atomic<bool> 保證了對其的讀寫操作是原子的,這意味著每個讀或寫操作都是不可中斷的。
2. 內存順序:原子操作還提供了內存順序語義,這確保了在多線程環境中的操作順序。
3. 無鎖操作:與使用互斥鎖相比,原子操作通常更快,因為它們通常是無鎖的。
在示例中,atomic<bool> 可能會在主線程中被設置,并在工作線程中被檢查。為了確保這種跨線程的通信是線程安全的,使用 atomic<bool> 是合適的。如果只使用普通的 bool,可能會導致數據競態,從而引起不可預測的行為。
總之,atomic<bool> 提供了一種線程安全的方式來讀取和修改一個布爾值,特別是在無需使用互斥鎖的情況下。
問題2:我在 CUDA 編程中也聽過原子操作,就是 GPU 中多個線程對同一個變量進行修改時要使用原子操作,這跟你提到的原子類型是不是類似的呢?🤔
回答:是的,CUDA 中的原子操作和 C++ 中的原子類型在概念上是非常相似的。兩者都是為了確保在并發環境中對數據的訪問和修改是線程安全的。
1. CUDA 中的原子操作:
- CUDA 提供了一系列原子函數,如 atomicAdd, atomicSub 等,這些函數可以確保在多個線程可能同時訪問同一內存位置時,這些操作是原子的。
- 這在 GPU 編程中尤為重要,因為 GPU 具有大量的并行執行單元,這使得多個線程可能會同時訪問同一個內存位置。
- 使用原子操作可以避免數據競態,并確保正確的結果。
2. C++ 中的原子類型:
- C++ 的 std::atomic 類模板提供了一系列原子操作,這些操作確保在多線程環境中對數據的讀取和寫入是線程安全的,而無需使用互斥鎖。
- 這對于 CPU 的多
- 線程編程非常有用,因為多個線程可能會并發地運行,并嘗試訪問和修改同一塊內存。
兩者的共同點是它們都提供了一種線程安全的方式來處理并發的數據訪問和修改。不同之處在于它們分別是為 GPU 和 CPU 設計的,并具有各自的特定語境和使用場景。
總的來說,無論是 CUDA 的原子操作還是 C++ 的原子類型,其核心目的都是為了提供一種在并發環境中安全訪問和修改數據的機制。
總結
本次課程學習了 RAII 接口模式下的生產者消費者多 batch 實現。把前面學的知識全都串起來了,首先通過 create_infer 創建 infer 資源并對模型進行加載,這是 RAII 體現,其次該函數返回的是接口類的智能指針,因此用戶只能看到 forward 這一個接口,這是接口模式的應用
在 InferImpl 具體實現類中,我們通過 forward 函數不斷向隊列中添加數據,而 worker 函數則會判斷隊列中的數據是否為空,若不為空則進行推理,若為空則繼續等待,是否等待是通過條件變量 condition_variable 的 wait 和 notify_one 來實現的,另外 worker 線程將推理結果返回到 forward 中是通過 promise 和 future 來實現,值得注意的是我們在 forward 中返回的并不是 future.get() 而是直接返回的一個 future 對象,具體什么時候 get 拿結果用使用者決定
這個示例把生產者和消費者模式、RAII接口模式以及異步機制等都結合起來,有點像 tensorRT_Pro 中推理實現部分的雛形😂