文章目錄
- 1. Fork - Join模式
- 2. Producer - Consumer模式
- 3. Readers - Writers模式
- 4. Work Thread模式
- 5. Actor模式
- 6、 Pipeline模式概述
- 應用場景
- C++實現示例
- 代碼解釋
1. Fork - Join模式
- 原理:將一個大任務分解為多個子任務,這些子任務在不同的線程中并行執行,當所有子任務完成后,再將它們的結果合并得到最終結果。
- 優點:充分利用多核處理器的并行能力,提高計算效率;任務分解和結果合并的過程清晰,便于理解和實現分治算法等。
- 缺點:需要考慮子任務的劃分合理性以及線程間的同步問題,以確保結果的正確性。
- 適用場景:適用于能有效分解為多個獨立子任務的計算密集型任務,如大規模數據處理、科學計算中的矩陣運算等。
#include <iostream>
#include <vector>
#include <thread>
#include <numeric>// 計算數組某區間的和
void sum_subarray(const std::vector<int>& arr, int start, int end, int& result) {result = std::accumulate(arr.begin() + start, arr.begin() + end, 0);
}int main() {std::vector<int> arr(1000);std::iota(arr.begin(), arr.end(), 1);int mid = arr.size() / 2;int left_sum = 0, right_sum = 0;// Fork: 創建兩個線程分別計算左右子數組的和std::thread left_thread(sum_subarray, std::ref(arr), 0, mid, std::ref(left_sum));std::thread right_thread(sum_subarray, std::ref(arr), mid, arr.size(), std::ref(right_sum));// Join: 等待兩個線程完成left_thread.join();right_thread.join();int total_sum = left_sum + right_sum;std::cout << "Total sum: " << total_sum << std::endl;return 0;
}
2. Producer - Consumer模式
- 原理:生產者線程負責生產數據并將其放入緩沖區,消費者線程從緩沖區取出數據進行處理。通過緩沖區實現生產者和消費者的解耦,使它們可以以不同的速度運行。
- 優點:提高了程序的并發度和整體性能,生產者和消費者可以獨立優化和擴展,代碼可讀性和維護性較好。
- 缺點:需要合理設計緩沖區大小和同步機制,避免緩沖區溢出或不足,同時要處理好生產者和消費者之間的同步與互斥問題。
- 適用場景:廣泛應用于各種數據處理場景,如文件讀取與處理、網絡數據的接收與解析、消息隊列系統等。
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>std::queue<int> task_queue;
std::mutex mtx;
std::condition_variable cv;
bool is_running = true;// 生產者函數
void producer() {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(100));{std::unique_lock<std::mutex> lock(mtx);task_queue.push(i);std::cout << "Produced: " << i << std::endl;}cv.notify_one();}{std::unique_lock<std::mutex> lock(mtx);is_running = false;}cv.notify_all();
}// 消費者函數
void consumer() {while (true) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [] { return!task_queue.empty() ||!is_running; });if (task_queue.empty() &&!is_running) {break;}int task = task_queue.front();task_queue.pop();std::cout << "Consumed: " << task << std::endl;}
}int main() {std::thread producer_thread(producer);std::thread consumer_thread(consumer);producer_thread.join();consumer_thread.join();return 0;
}
3. Readers - Writers模式
- 原理:允許多個讀者線程同時訪問共享資源,但當有寫者線程訪問時,所有讀者和其他寫者都需等待。寫者線程具有更高的優先級,以確保數據的一致性。
- 優點:能有效提高對共享資源的訪問效率,在有大量讀操作和少量寫操作的情況下,能充分利用多核處理器的并行性。
- 缺點:實現較為復雜,需要精細地控制讀寫線程的同步與互斥,以避免數據不一致和死鎖等問題。
- 適用場景:常用于數據庫系統、文件系統等需要頻繁讀寫共享數據的場景,其中讀操作遠遠多于寫操作。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>std::mutex rw_mutex;
std::condition_variable rw_cv;
int readers = 0;
bool writer_active = false;// 讀者函數
void reader(int id) {{std::unique_lock<std::mutex> lock(rw_mutex);rw_cv.wait(lock, [] { return!writer_active; });++readers;}std::cout << "Reader " << id << " is reading." << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));{std::unique_lock<std::mutex> lock(rw_mutex);--readers;if (readers == 0) {rw_cv.notify_one();}}
}// 寫者函數
void writer(int id) {{std::unique_lock<std::mutex> lock(rw_mutex);rw_cv.wait(lock, [] { return!writer_active && readers == 0; });writer_active = true;}std::cout << "Writer " << id << " is writing." << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));{std::unique_lock<std::mutex> lock(rw_mutex);writer_active = false;rw_cv.notify_all();}
}int main() {std::thread r1(reader, 1);std::thread r2(reader, 2);std::thread w1(writer, 1);std::thread r3(reader, 3);r1.join();r2.join();w1.join();r3.join();return 0;
}
4. Work Thread模式
- 原理:有一個任務隊列,多個工作線程從隊列中獲取任務并執行。任務隊列可以是優先級隊列或普通隊列,工作線程根據一定的規則從隊列中取出任務進行處理。
- 優點:實現相對簡單,易于擴展工作線程的數量來應對不同的負載需求,能有效利用線程資源,避免線程的頻繁創建和銷毀。
- 缺點:任務隊列的管理和線程的調度需要一定的開銷,可能會出現任務饑餓現象,即某些低優先級任務長時間得不到執行。
- 適用場景:適用于處理大量異步任務的場景,如網絡服務器中的請求處理、分布式系統中的任務調度等。
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <vector>std::queue<std::function<void()>> work_queue;
std::mutex queue_mutex;
std::condition_variable queue_cv;
bool is_working = true;// 工作線程函數
void worker() {while (true) {std::function<void()> task;{std::unique_lock<std::mutex> lock(queue_mutex);queue_cv.wait(lock, [] { return!work_queue.empty() ||!is_working; });if (work_queue.empty() &&!is_working) {break;}task = std::move(work_queue.front());work_queue.pop();}task();}
}int main() {std::vector<std::thread> workers;for (int i = 0; i < 3; ++i) {workers.emplace_back(worker);}// 添加任務到隊列{std::unique_lock<std::mutex> lock(queue_mutex);for (int i = 0; i < 5; ++i) {work_queue.emplace([i] {std::cout << "Task " << i << " is being executed." << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));});}}queue_cv.notify_all();// 等待所有任務完成{std::unique_lock<std::mutex> lock(queue_mutex);is_working = false;}queue_cv.notify_all();for (auto& t : workers) {t.join();}return 0;
}
5. Actor模式
- 原理:將整個處理過程分為多個階段,每個階段由一個或多個Actor(參與者)負責。數據在這些Actor之間像流水線一樣依次傳遞,每個Actor處理完數據后將其傳遞給下一個Actor,直到最終處理完成。
- 優點:提高了系統的吞吐量和響應性,各個階段可以并行執行,而且易于維護和擴展,每個Actor可以獨立開發和測試。
- 缺點:需要精心設計流水線的各個階段和數據傳遞方式,以確保數據的正確流轉和系統的穩定性。
- 適用場景:適用于處理流程較為復雜且可以明確劃分為多個階段的任務,如視頻編碼解碼、圖像處理流水線、網絡數據包的處理等。
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>// 第一個階段 Actor
class FirstActor {
public:std::queue<int> input_queue;std::mutex mtx;std::condition_variable cv;bool is_finished = false;void run() {for (int i = 0; i < 5; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(100));{std::unique_lock<std::mutex> lock(mtx);input_queue.push(i);}cv.notify_one();}{std::unique_lock<std::mutex> lock(mtx);is_finished = true;}cv.notify_all();}
};// 第二個階段 Actor
class SecondActor {
public:FirstActor& first_actor;std::queue<int> output_queue;std::mutex mtx;std::condition_variable cv;bool is_finished = false;SecondActor(FirstActor& fa) : first_actor(fa) {}void run() {while (true) {int data;{std::unique_lock<std::mutex> lock(first_actor.mtx);first_actor.cv.wait(lock, [this] { return!first_actor.input_queue.empty() || first_actor.is_finished; });if (first_actor.input_queue.empty() && first_actor.is_finished) {break;}data = first_actor.input_queue.front();first_actor.input_queue.pop();}data *= 2;{std::unique_lock<std::mutex> lock(mtx);output_queue.push(data);}cv.notify_one();}{std::unique_lock<std::mutex> lock(mtx);is_finished = true;}cv.notify_all();}
};// 第三個階段 Actor
class ThirdActor {
public:SecondActor& second_actor;ThirdActor(SecondActor& sa) : second_actor(sa) {}void run() {while (true) {int data;{std::unique_lock<std::mutex> lock(second_actor.mtx);second_actor.cv.wait(lock, [this] { return!second_actor.output_queue.empty() || second_actor.is_finished; });if (second_actor.output_queue.empty() && second_actor.is_finished) {break;}data = second_actor.output_queue.front();second_actor.output_queue.pop();}std::cout << "Final result: " << data << std::endl;}}
};int main() {FirstActor first_actor;SecondActor second_actor(first_actor);ThirdActor third_actor(second_actor);std::thread t1(&FirstActor::run, &first_actor);std::thread t2(&SecondActor::run, &second_actor);std::thread t3(&ThirdActor::run, &third_actor);t1.join();t2.join();t3.join();return 0;
}
6、 Pipeline模式概述
Pipeline(流水線)模式是一種將一個復雜任務分解為多個獨立子任務,并讓這些子任務像流水線一樣依次執行的設計模式。每個子任務負責處理一部分工作,處理完后將結果傳遞給下一個子任務,最終完成整個復雜任務。這種模式可以提高系統的并發性能和可維護性,因為每個子任務可以獨立開發、測試和優化,而且不同的子任務可以并行執行。
應用場景
- 數據處理流程:例如在圖像或視頻處理中,一個完整的處理流程可能包括圖像讀取、降噪、增強、裁剪等多個步驟,每個步驟可以作為一個獨立的子任務在流水線上執行。
- 網絡請求處理:在網絡服務器中,一個請求可能需要經過接收、解析、驗證、業務邏輯處理、響應生成等多個階段,使用流水線模式可以高效地處理大量請求。
C++實現示例
以下是一個簡單的C++示例,模擬一個簡單的數據處理流水線,包含三個子任務:數據生成、數據處理和數據輸出。
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>// 第一個階段:數據生成
class DataGenerator {
public:std::queue<int> outputQueue;std::mutex mtx;std::condition_variable cv;bool isFinished = false;void run() {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(100));{std::unique_lock<std::mutex> lock(mtx);outputQueue.push(i);}cv.notify_one();}{std::unique_lock<std::mutex> lock(mtx);isFinished = true;}cv.notify_all();}
};// 第二個階段:數據處理
class DataProcessor {
public:DataGenerator& generator;std::queue<int> outputQueue;std::mutex mtx;std::condition_variable cv;bool isFinished = false;DataProcessor(DataGenerator& gen) : generator(gen) {}void run() {while (true) {int data;{std::unique_lock<std::mutex> lock(generator.mtx);generator.cv.wait(lock, [this] { return!generator.outputQueue.empty() || generator.isFinished; });if (generator.outputQueue.empty() && generator.isFinished) {break;}data = generator.outputQueue.front();generator.outputQueue.pop();}// 簡單的數據處理,這里將數據加倍data *= 2;{std::unique_lock<std::mutex> lock(mtx);outputQueue.push(data);}cv.notify_one();}{std::unique_lock<std::mutex> lock(mtx);isFinished = true;}cv.notify_all();}
};// 第三個階段:數據輸出
class DataOutputter {
public:DataProcessor& processor;DataOutputter(DataProcessor& proc) : processor(proc) {}void run() {while (true) {int data;{std::unique_lock<std::mutex> lock(processor.mtx);processor.cv.wait(lock, [this] { return!processor.outputQueue.empty() || processor.isFinished; });if (processor.outputQueue.empty() && processor.isFinished) {break;}data = processor.outputQueue.front();processor.outputQueue.pop();}std::cout << "Processed data: " << data << std::endl;}}
};int main() {DataGenerator generator;DataProcessor processor(generator);DataOutputter outputter(processor);std::thread generatorThread(&DataGenerator::run, &generator);std::thread processorThread(&DataProcessor::run, &processor);std::thread outputterThread(&DataOutputter::run, &outputter);generatorThread.join();processorThread.join();outputterThread.join();return 0;
}
代碼解釋
- DataGenerator類:負責生成數據,將生成的數據放入輸出隊列
outputQueue
,并通過條件變量cv
通知下一個階段有新數據可用。 - DataProcessor類:從
DataGenerator
的輸出隊列中獲取數據,對數據進行處理(這里簡單地將數據加倍),然后將處理后的數據放入自己的輸出隊列,并通知下一個階段。 - DataOutputter類:從
DataProcessor
的輸出隊列中獲取數據,并將其輸出到控制臺。 - main函數:創建三個線程分別運行三個階段的任務,并等待所有線程執行完畢。
通過這種方式,數據在三個階段之間依次傳遞,形成了一個簡單的流水線。每個階段可以獨立運行,提高了系統的并發性能。