參考《c++ Concurrency In Action 》第二章做的筆記
目錄
- 傳遞參數
- 量產線程
- 線程標識
傳遞參數
thread構造函數的附加參數會拷貝至新線程的內存空間中,即使函數中的采納數是引用類型,拷貝操作也會執行。如果我們期待傳入一個引用,必須使用std::ref
將參數轉換成引用形式:
如下:
void update_weight(weight_id w,weight_data& data); //1
void oops(weight_id w)
{weight_data data;//錯誤方式std::thread t(update_weight,w,data);//正確方式std::thread t(update_weight,w,std::ref(data));display_status();t.join();
}
這樣就能收到data的引用,而非data的拷貝副本。
與std:bind
的傳參機制相同,使用std::thread
創建線程時,傳遞參數的過程如下:
- 向
std::thread
構造函數傳參:一般實參會被拷貝至新線程的內存空間。具體拷貝的過程是由調用線程(主線程)在堆上創建并交由子線程管理,在子線程結束時同時被釋放。 - 向線程函數傳參:由于
std::thread
對象里一般保存的是參數的副本,為了效率同時兼顧一些只移動類型的對象,所有的副本均被std::move
到線程函數,即以右值的形式傳入。
示例:std::move轉移動態對象的所有權到線程中去:
void process_big_object(std::unique_ptr<big_object>);std::unique_ptr<big_object> p(new big_object);
p->prepare_data(47);
std::thread t(process_big_object,std::move(p));
在thread構造函數中執行move,big_object對象的所有權首先轉移到新創建線程的內部存儲中,之后再傳遞給process_big_object函數。
量產線程
void do_work(unsigned id);void f()
{std::vector<std::thread> threads;for(unsigned i = 0; i < 20; i++)threads.emplace_back(do_work,i); //產生線程for(auto& entry : threads) //對每個線程調用join()entry.join();
}
使用線程去分割一個算法的工作總量,所以在算法結束之前,所有線程必須結束。線程所做的工作都是獨立的,并且結果僅會受到共享數據的影響。如果f有返回值,在寫入返回值之前,程序會檢查使用共享數據的線程是否終止。
下面函數,會返回并發線程的數量
std::thread::hardware_concurrency()
下面展示并行版本的std::accumulate
。代碼將整體工作拆分成小任務,交給每個線程去做,并設置最小任務數,避免產生太多的線程,程序會在操作數量為0時拋出異常。
template<typename Iterator,typename T>
struct accumulate_block
{void operator()(Iterator first,Iterator last,T& result){result = std::accumulate(first,last,result);}
};template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{unsigned long const length = std::distance(first,last);if(!length) //1return init;unsigned long const min_per_thread = 25;unsigned long const max_threads = (length+min_per_thread-1)/min_per_thread; //2unsigned long const hardware_threads = std::thread::hardware_concurrency();unsigned long const num_threads = //3std::min(hardware_threads != 0 ? hardware_threads : 2,max_threads);unsigned long const block_size = length / num_threads; //4std::vector<T> results(num_threads);std::vector<std::thread> threads(num_threads - 1); //5Iterator block_start = first;for(unsigned long i = 0; i < num_threads-1; ++i){Iterator block_end = block_start;std::advance(block_end,block_size); //6threads[i] = std::thread( //7accumulate_block<Iterator,T>(),block_start,block_end,std::ref(result[i]));block_start = block_end; //8}accumulate_block<Iterator,T>(),block_start,last,results[num_threads-1]); //9for(auto& entry : threads)entry.join(); //10return std::accumulate(results.begin(),results.end(),init); //11
}
函數解釋:
如果輸入范圍為空1,就會得到init值。如果范圍內的元素多于1個,需要用范圍內元素的總數量除以線程塊中最小任務數,從而確定啟動線程的最大數量。由于上下文切換可能會降低線程性能,所以計算最大線程數以及硬件支持線程數,選擇較小值作為啟動線程數量3。如果為0的話,選擇2替代。
每個線程中處理的元素數量,是范圍中元素的總量除以線程的個數得出的4。
既然確定了線程個數,創建一個std::vector<T>
容器存放中間結果,并為線程創建一個std::vector<std::thread>
容器。因為在啟動之前已經有了一個主線程,所以啟動線程數是num_threads-1。
使用循環啟動線程,block_end指向當前塊的末尾6,并啟動一個新線程為當前塊累加結果7.當迭代器指向當前塊的末尾時,啟動下一個塊8.
啟動所有縣城后,9中線程會處理最終塊的結果,累加最終塊結果后,等待std::for_each創建線程10.之后使用std::accumulate將所有結果進行累加11
線程標識
線程標識為std::thread::id
類型,可以通過成員函數get_id()
來獲取。如果thread對象沒有和任何執行線程相關聯,將返回一個默認構造值,表示“無線程”。
如果兩個線程id相等,說明是同一個線程,或者都是“無線程”。
std::thread::id
實例常用作檢測線程是否需要進行一些操作,比如,當用線程來分割一項工作,主線程可能要做一些與其他線程不同的工作,啟動其他線程前,可以通過get_id得到自己線程id,每個線程都檢查一下,其擁有線程id是否與初始線程id相同。
std::thread::id master_thread;
void some_core_part_of_algorithm()
{if(std::this_thread::get_id() == master_thread)do_master_thread_work();elsedo_common_work();
}