個人主頁:chian-ocean
文章專欄-Linux
Linux線程同步實戰:多線程程序的同步與調度
- 個人主頁:chian-ocean
- 文章專欄-Linux
- 前言:
- 為什么要實現線程同步
- 線程饑餓(Thread Starvation)
- 示例:搶票問題
- 條件變量
- 條件變量的工作原理
- 常用操作:
- `pthread_cond_wait`
- `pthread_cond_signal`
- `pthread_cond_broadcast`
- 基于條件變量的生產消費者模型(阻塞隊列)
- 生產-消費者模型(也叫做游有界緩沖區)
- 模型原理
- 工作原理
- 同步機制
- 阻塞隊列
- 關鍵點分析
- 錯誤與改進:
- 生產消費模型
- 基于信號量的生產-消費者模型(環形隊列)
- POSIX信號量
- 模型原理
- 工作原理
- 同步機制
- 環形隊列
- 詳細分析:
- 類成員變量:
- 構造函數:
- 析構函數:
- `push()` 方法(生產者操作):
- `pop()` 方法(消費者操作):
- 主函數
- 消費者線程函數 (`cosumer`)
- 生產者線程函數 (`productor`)
- 主函數 (`main`)
前言:
Linux 是一個多任務操作系統,它通過提供多種線程同步機制來幫助開發人員有效地管理線程之間的協作與沖突。正確的線程同步不僅能避免這些問題,還能提升程序的可靠性和性能。
為什么要實現線程同步
線程饑餓(Thread Starvation)
線程饑餓是指在多線程程序中,某些線程因為無法獲取到所需的資源,長時間被阻塞,導致無法執行,甚至永遠無法執行。這種情況通常發生在低優先級線程無法獲得 CPU 時間,或者無法獲得必要的鎖資源時。線程饑餓會導致系統資源無法得到充分利用,程序的性能和響應性也會下降。
示例:搶票問題
- 最初的搶票問題出現了讀寫數據不一致問題,我們通過加鎖解決了問題。
#include <iostream>
#include <unistd.h>
#include <pthread.h>using namespace std;// 定義常量 NUM 表示創建線程的數量
#define NUM 10// 初始化互斥鎖 mutex,用于保護共享資源 tickets
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;// 定義共享資源 tickets,用于模擬票的數量
int tickets = 1000;// 線程的工作函數,用于搶購票
void* SanpUpTichets(void* args)
{// 將當前線程與主線程分離,確保線程結束時可以自動回收資源pthread_detach(pthread_self());// 無限循環,模擬持續搶票while(true){// 加鎖,保護共享資源 ticketspthread_mutex_lock(&mutex);// 如果還有票if(tickets > 0){// 打印當前線程的信息和剩余票數cout << "pthread: " <<(int64_t) args << " tickets: " << tickets << endl;tickets--; // 搶票,票數減一}else {// 如果票已經搶完,解鎖并跳出循環pthread_mutex_unlock(&mutex);break;}// 解鎖,允許其他線程訪問pthread_mutex_unlock(&mutex);// usleep(20); // 如果需要模擬延遲,可以解開注釋}return nullptr;
}int main()
{// 創建 NUM 個線程,模擬多個線程同時搶購票for(int i = 0 ; i < NUM ; i++){pthread_t tid;void* n = (void*)i; // 傳遞線程編號作為參數pthread_create(&tid, nullptr, SanpUpTichets, n); // 創建新線程并執行搶票函數}// 主線程休眠 10 秒,確保子線程有足夠時間搶票sleep(10);return 0;
}
代碼分析:
- 共享資源:
tickets
代表剩余票數,所有線程都會訪問并修改它。 - 互斥鎖:
pthread_mutex_t mutex
用于確保同一時刻只有一個線程能夠訪問和修改tickets
,避免數據沖突。 - 多線程創建: 創建了 10 個線程,每個線程執行
SanpUpTichets
函數,嘗試減少票數。 - 線程同步: 每個線程在修改
tickets
前加鎖,修改完后解鎖,確保線程安全。 - 問題:但是仍然存在一個問題,就是饑餓問題(發現都是8號進程進行搶票環節)。
條件變量
條件變量(Condition Variable)是多線程編程中的一種同步機制,用于在線程之間傳遞信號或同步操作。它允許一個線程等待某個條件成立,然后再繼續執行。條件變量通常與互斥鎖(mutex
)一起使用,來保護共享資源和確保線程同步
條件變量的工作原理
條件變量允許線程在某些條件成立時進行“等待”操作。當某個條件不滿足時,線程會進入等待狀態,直到被另一個線程通知條件已經滿足并可以繼續執行。
常用操作:
- 等待:一個線程可以調用
pthread_cond_wait
來等待某個條件成立。調用這個函數時,它會自動釋放與條件變量關聯的互斥鎖,并讓線程進入等待狀態,直到其他線程通過條件變量通知它。 - 通知:當某個條件滿足時,線程可以調用
pthread_cond_signal
或pthread_cond_broadcast
來通知等待的線程。pthread_cond_signal
會喚醒一個等待的線程,而pthread_cond_broadcast
會喚醒所有等待的線程。
pthread_cond_wait
該函數使得線程在滿足某個條件之前進入阻塞狀態。調用此函數時,線程將等待一個條件變量上的信號通知。調用此函數時會自動釋放與條件變量關聯的互斥鎖,并讓線程進入等待狀態,直到條件滿足或被其他線程通知喚醒。
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
參數:
-
cond
:這是要等待的條件變量。它是一個pthread_cond_t
類型的結構體,用來表示條件變量。 -
mutex
:與條件變量關聯的互斥鎖,它是一個pthread_mutex_t
類型的結構體。此互斥鎖用于保護共享資源的訪問,確保只有一個線程可以修改條件變量。
pthread_cond_signal
目的:此函數用于喚醒一個正在等待條件變量 cond
的線程。
語法:
int pthread_cond_signal(pthread_cond_t *cond);
參數:
cond
:指向條件變量的指針。
pthread_cond_broadcast
目的:此函數用于喚醒所有正在等待條件變量 cond
的線程。
語法:
int pthread_cond_broadcast(pthread_cond_t *cond);
參數:
cond
:指向條件變量的指針
基于條件變量的生產消費者模型(阻塞隊列)
生產-消費者模型(也叫做游有界緩沖區)
兩個進程(線程)共享一塊公共的緩沖區,其中一個生產者,將數據放入緩沖區;另外一個是消費者,將數據從緩沖區取走(也可以把這個問題一般化為m
個生產者和n
個消費者)
模型原理
- 生產者(Producer):生成數據、任務或產品,并將它們放入一個共享緩沖區。
- 消費者(Consumer):從共享緩沖區中取出數據并進行處理。
- 緩沖區(Buffer):充當生產者和消費者之間的中介,通常是一個有限的隊列或數組。緩沖區容量有限,當緩沖區已滿時,生產者需要等待;當緩沖區為空時,消費者需要等待。
工作原理
- 生產者:將生成的數據放入緩沖區。
- 消費者:從緩沖區獲取數據并進行處理。
- 緩沖區:充當共享資源,在多線程環境下進行同步。生產者和消費者的速度不同,可能會出現緩沖區滿或者為空的情況。
同步機制
在生產者-消費者模型中,通常需要使用同步機制來確保生產者和消費者之間不會發生沖突,常見的同步機制包括:
- 互斥鎖(Mutex):確保每次只有一個線程能夠訪問共享資源。
- 條件變量(Condition Variable):允許線程在特定條件下被掛起和喚醒,例如生產者等待緩沖區有空間時,消費者等待緩沖區有數據時。
- 信號量(Semaphore):控制對共享資源的訪問數量,通常用于限制對緩沖區的訪問。
阻塞隊列
#include<iostream>
#include<queue>
#include<pthread.h>#define defaultnum 5 // 定義隊列的默認容量為5// 阻塞隊列模板類,支持泛型T(可以存儲任意類型的數據)
template<class T>
class blockqueue
{
private:std::queue<T> _q; // 用于存儲數據的標準隊列int _cap = defaultnum; // 隊列的最大容量,默認為5pthread_mutex_t _lock; // 互斥鎖,用于保證線程安全pthread_cond_t _c_cond; // 消費者等待的條件變量,當隊列為空時,消費者線程會等待pthread_cond_t _p_cond; // 生產者等待的條件變量,當隊列滿時,生產者線程會等待
public:// 構造函數,接受隊列的最大容量blockqueue(int cap = defaultnum):_cap(cap){// 初始化互斥鎖和條件變量pthread_mutex_init(&_lock, nullptr);pthread_cond_init(&_c_cond, nullptr);pthread_cond_init(&_p_cond, nullptr);}// 析構函數,銷毀互斥鎖和條件變量~blockqueue(){pthread_mutex_destroy(&_lock); // 銷毀互斥鎖pthread_cond_destroy(&_c_cond); // 銷毀消費者條件變量pthread_cond_destroy(&_p_cond); // 銷毀生產者條件變量}// 生產者線程調用的push方法,插入數據到隊列中void push(const T& in){pthread_mutex_lock(&_lock); // 上鎖,確保線程安全// 如果隊列已滿,生產者需要等待if (_cap == _q.size()) {pthread_cond_wait(&_p_cond, &_lock); // 等待生產者條件變量}// 將數據放入隊列_q.push(in);// 喚醒一個消費者線程(如果有的話)pthread_cond_signal(&_c_cond);pthread_mutex_unlock(&_lock); // 解鎖}// 消費者線程調用的pop方法,從隊列中取出數據T pop(){pthread_mutex_lock(&_lock); // 上鎖,確保線程安全// 如果隊列為空,消費者需要等待while (_q.size() == 0) {pthread_cond_wait(&_c_cond, &_lock); // 等待消費者條件變量}// 從隊列中取出數據T data = _q.front();_q.pop();// 喚醒一個生產者線程(如果有的話)pthread_cond_signal(&_p_cond);pthread_mutex_unlock(&_lock); // 解鎖return data; // 返回取出的數據}
};
關鍵點分析
- 互斥鎖 (
_lock
):用于保護隊列的訪問,確保每次只有一個線程能訪問隊列。這樣可以避免多個線程同時修改隊列引發的數據競爭問題。 - 條件變量 (
_c_cond
和_p_cond
):_c_cond
用于消費者線程,當隊列為空時,消費者會等待該條件變量。_p_cond
用于生產者線程,當隊列滿時,生產者會等待該條件變量。
- 生產者操作 (
push
):- 如果隊列已滿,生產者會等待,直到有空位可以插入新的數據。
- 插入數據后,通過
pthread_cond_signal
喚醒一個等待的消費者線程。
- 消費者操作 (
pop
):- 如果隊列為空,消費者會等待,直到有新數據可以消費。
- 從隊列中取出數據后,通過
pthread_cond_signal
喚醒一個等待的生產者線程。
錯誤與改進:
- 線程阻塞的合理性:此實現正確處理了生產者和消費者的等待機制,保證了生產者不會在隊列滿時繼續生產,消費者不會在隊列空時繼續消費。
pthread_cond_wait
調用條件:使用while
而不是if
來等待消費,因為在高并發環境下,可能會出現虛假喚醒(即線程被喚醒后,隊列可能仍為空或已滿)。
生產消費模型
#include<iostream> // 引入標準輸入輸出庫
#include<unistd.h> // 引入Unix標準庫,提供sleep函數等
#include "blockqueue.hpp" // 引入blockqueue頭文件,定義了阻塞隊列
#include "task.hpp" // 引入task頭文件,定義了任務類
using namespace std;void * producter(void* args) // 生產者線程函數,傳入的參數為阻塞隊列的地址
{blockqueue<task>* bt = static_cast<blockqueue<task>*>(args); // 將參數轉換為blockqueue類型while(true){int x = rand() % 10; // 隨機生成一個整數x,范圍0到9int y = rand() % 10; // 隨機生成一個整數y,范圍0到9task t(x, y, opers[rand() % 4]); // 創建一個task對象,運算符從opers數組中隨機選擇t.run(); // 執行任務的run方法bt->push(t); // 將生成的任務t放入阻塞隊列cout << "生產一個數據: " << t.Gettask() << endl; // 輸出任務信息sleep(1); // 休眠1秒,模擬生產的間隔時間}return nullptr; // 返回空指針,結束線程函數
}void* consumer(void* args) // 消費者線程函數,傳入的參數為阻塞隊列的地址
{blockqueue<task>* bt = static_cast<blockqueue<task>*>(args); // 將參數轉換為blockqueue類型while(true){task data = bt->pop(); // 從阻塞隊列中取出一個任務cout << "消費一個數據: " << data.Getresult() << endl; // 輸出任務結果}
}int main()
{blockqueue<task>* bq = new blockqueue<task>(); // 創建一個新的阻塞隊列對象bqsrand(time(nullptr)); // 使用當前時間作為隨機數種子,以確保每次運行的隨機數不同pthread_t ctid, ptid; // 定義消費者線程和生產者線程的線程ID// 創建生產者線程,傳入bq作為參數pthread_create(&ptid, nullptr, producter, bq); // 創建消費者線程,傳入bq作為參數pthread_create(&ctid, nullptr, consumer, bq);// 等待消費者線程結束pthread_join(ctid, nullptr);// 等待生產者線程結束pthread_join(ptid, nullptr);return 0; // 主函數返回
}
- **生產者線程(
producter
):
- 該線程負責生產任務并將其加入到阻塞隊列中。
- 每次循環中,生產者隨機生成兩個整數
x
和y
,并根據一個隨機的運算符構造一個task
對象。 - 調用
task
對象的run
方法進行任務處理后,將任務推送到阻塞隊列中。 - 輸出任務的相關信息,并休眠1秒,模擬生產任務的過程。
- 消費者線程(
consumer
):
- 該線程負責從阻塞隊列中獲取任務并消費(處理)這些任務。
- 消費者通過
pop
方法從阻塞隊列中獲取任務,然后輸出任務的結果。
- 主函數(
main
):
- 創建一個
blockqueue<task>
類型的對象bq
,用于存放task
對象。 - 設置隨機數種子,以確保每次運行時生成的隨機數不同。
- 創建并啟動生產者線程和消費者線程,傳遞
bq
作為參數。 - 使用
pthread_join
等待生產者和消費者線程的結束。
基于信號量的生產-消費者模型(環形隊列)
POSIX信號量
sem_init()
:初始化信號量。sem_wait()
:執行“等待”操作,降低信號量的值,若信號量為0,則進程會被阻塞。sem_post()
:執行“釋放”操作,增加信號量的值,若有進程等待該信號量,會喚醒一個進程。sem_destroy()
:銷毀信號量。
模型原理
組件 | 描述 |
---|---|
生產者(Producer) | 不斷生產數據,嘗試放入緩沖區 |
消費者(Consumer) | 不斷消費數據,嘗試從緩沖區取出 |
環形緩沖區(Circular Queue) | 共享的有限大小的隊列,生產者寫入,消費者讀取 |
信號量(Semaphore) | 控制生產者和消費者行為,保證同步與互斥 |
工作原理
環形隊列是一種先進先出(FIFO)的結構,具備“循環”特性:
- 使用一個固定大小的數組
buffer[N]
。 - 使用兩個指針:
head
:消費者從這里取數據。tail
:生產者向這里放數據。
- 通過模運算(
% N
)實現循環結構:
當tail
或head
增加到數組尾部時,再次從頭開始。
同步機制
empty_slots
信號量:表示環形隊列中空槽的數量。生產者每次生產一個產品時,減少一個空槽;如果空槽為0,生產者將被阻塞,直到消費者取走產品,釋放空槽。full_slots
信號量:表示環形隊列中已填充的槽的數量。消費者每次消費一個產品時,減少一個滿槽;如果滿槽為0,消費者將被阻塞,直到生產者生產產品并釋放滿槽。mutex
信號量:保證生產者和消費者對環形隊列的互斥訪問,防止多個線程同時修改隊列,導致數據沖突。
環形隊列
這段代碼實現了一個基于 信號量 和 互斥鎖 的 環形隊列(Ring Queue) 類 Ringqueue
,其中 T
是隊列元素的類型。環形隊列的大小是可配置的,并且支持生產者和消費者并發訪問。
讓我們逐行分析這段代碼并添加詳細注釋:
const static int defaultcap = 6; // 默認隊列大小為6template<class T> // 泛型隊列,支持任意類型的元素
class Ringqueue
{
private:std::vector<T> _ringqueue; // 存儲環形隊列元素int _cap; // 隊列容量int _c_step; // 消費者隊列指針,表示下一個消費的位置int _p_step; // 生產者隊列指針,表示下一個生產的位置// 信號量,用于同步生產者和消費者的操作sem_t _cdata_sem; // 消費者信號量(數據),表示可供消費者消費的數據數量sem_t _pspace_sem; // 生產者信號量(空間),表示可供生產者生產的空槽數量// 互斥鎖,用于保護生產者和消費者對隊列的訪問,防止競爭條件pthread_mutex_t _mutex_c; // 用于消費者線程的互斥鎖pthread_mutex_t _mutex_p; // 用于生產者線程的互斥鎖public:// 構造函數,初始化環形隊列、信號量和互斥鎖Ringqueue(int cap = defaultcap): _ringqueue(cap), _cap(cap), _c_step(0), _p_step(0) // 初始化環形隊列,容量和指針{sem_init(&_cdata_sem, 0, 0); // 初始化消費者信號量,初始為0,表示沒有數據可消費sem_init(&_pspace_sem, 0, cap); // 初始化生產者信號量,初始為隊列容量,表示有空間可以生產pthread_mutex_init(&_mutex_c, nullptr); // 初始化消費者互斥鎖pthread_mutex_init(&_mutex_p, nullptr); // 初始化生產者互斥鎖}// 析構函數,銷毀信號量和互斥鎖~Ringqueue(){sem_destroy(&_cdata_sem); // 銷毀消費者信號量sem_destroy(&_pspace_sem); // 銷毀生產者信號量pthread_mutex_destroy(&_mutex_c); // 銷毀消費者互斥鎖pthread_mutex_destroy(&_mutex_p); // 銷毀生產者互斥鎖}// 插入數據到隊列(生產者操作)void push(const T& in){// 等待空槽信號量(即空間是否足夠,如果沒有空槽,生產者阻塞)sem_wait(&_pspace_sem);// 獲取生產者互斥鎖,確保只有一個生產者可以修改隊列pthread_mutex_lock(&_mutex_p);// 將數據插入到隊列的生產者指針位置_ringqueue[_p_step++] = in;// 更新生產者指針(確保指針在環形隊列中循環)_p_step %= _cap;// 釋放生產者互斥鎖pthread_mutex_unlock(&_mutex_p);// 增加消費者信號量,表示隊列中有新數據可供消費sem_post(&_cdata_sem);}// 從隊列取數據(消費者操作)T pop(){// 等待數據信號量(即是否有數據,如果沒有數據,消費者阻塞)sem_wait(&_cdata_sem);// 獲取消費者互斥鎖,確保只有一個消費者可以修改隊列pthread_mutex_lock(&_mutex_c);// 從隊列的消費者指針位置取出數據T data = _ringqueue[_c_step++];// 更新消費者指針(確保指針在環形隊列中循環)_c_step %= _cap;// 釋放消費者互斥鎖pthread_mutex_unlock(&_mutex_c);// 增加生產者信號量,表示隊列中有空槽可以生產數據sem_post(&_pspace_sem);// 返回消費者取出的數據return data;}
};
詳細分析:
類成員變量:
_ringqueue
:這是一個std::vector<T>
,用于存儲環形隊列的數據。_cap
:隊列的容量,表示隊列的最大長度。_c_step
和_p_step
:分別是消費者和生產者指針,用于指示隊列中下一個被訪問的元素的位置。它們是環形隊列的重要組成部分,通過模運算實現循環。_cdata_sem
和_pspace_sem
:消費者和生產者的信號量,用于同步生產者和消費者的操作,確保生產者不會在隊列滿時插入數據,消費者不會在隊列空時取出數據。_mutex_c
和_mutex_p
:用于保護消費者和生產者線程訪問隊列的互斥鎖,防止并發線程訪問隊列時發生數據競爭。
構造函數:
- 初始化隊列大小、信號量和互斥鎖。
sem_init(&_cdata_sem, 0, 0)
將消費者信號量初始為0,表示隊列中沒有數據可供消費。sem_init(&_pspace_sem, 0, cap)
將生產者信號量初始為隊列的容量,表示有足夠的空間可以進行生產。
析構函數:
- 銷毀信號量和互斥鎖,釋放資源。
push()
方法(生產者操作):
sem_wait(&_pspace_sem)
:首先,生產者通過等待pspace_sem
信號量來檢查是否有空槽可以插入數據。如果沒有空槽(即隊列滿),生產者線程將阻塞,直到有空槽可用。pthread_mutex_lock(&_mutex_p)
:然后,生產者獲取生產者的互斥鎖,確保在插入數據時不會有其他線程同時訪問隊列。- 插入數據:生產者將數據插入隊列,并更新
p_step
指針。 pthread_mutex_unlock(&_mutex_p)
:釋放互斥鎖,允許其他線程訪問隊列。sem_post(&_cdata_sem)
:最后,生產者通過增加cdata_sem
信號量來通知消費者隊列中有新的數據可供消費。
pop()
方法(消費者操作):
sem_wait(&_cdata_sem)
:消費者通過等待cdata_sem
信號量來檢查隊列是否有數據可以消費。如果沒有數據(即隊列空),消費者線程將阻塞,直到有數據可用。pthread_mutex_lock(&_mutex_c)
:然后,消費者獲取消費者的互斥鎖,確保在取出數據時不會有其他線程同時訪問隊列。- 取出數據:消費者從隊列中取出數據,并更新
c_step
指針。 pthread_mutex_unlock(&_mutex_c)
:釋放互斥鎖,允許其他線程訪問隊列。sem_post(&_pspace_sem)
:最后,消費者通過增加pspace_sem
信號量來通知生產者隊列中有空槽可以插入數據。
這段代碼展示了一個簡單的生產者-消費者模型,其中使用了環形隊列(Ringqueue
)來存儲任務(task
)。生產者線程生成任務并將其放入隊列,消費者線程從隊列中取出任務并處理。以下是代碼的詳細分析和注釋:
主函數
消費者線程函數 (cosumer
)
void* cosumer(void* args)
{Ringqueue<task>* rq = static_cast<Ringqueue<task>*>(args);while (true){task data = rq->pop();cout <<"消費了一個數據: " << data.Getresult() <<endl;}return nullptr;
}
cosumer
是消費者線程的入口函數。它會一直從隊列中取出任務并進行處理。- 使用
pop()
方法從隊列中取出一個task
對象。 Getresult()
假設是task
類中的方法,返回任務的結果。這里通過輸出該結果來模擬消費操作。while (true)
使得消費者線程持續運行,直到程序結束。
生產者線程函數 (productor
)
void* productor(void* args)
{Ringqueue<task>* rq = static_cast<Ringqueue<task>*>(args);while (true){int x = rand() % 10;int y = rand() % 10;task t(x, y, opers[rand() % opers.size()]);sleep(1);t.run();rq->push(t);cout << "生產一個數據:" << t.Gettask() << endl;usleep(1);}return nullptr;
}
productor
是生產者線程的入口函數。它持續生成任務并將其插入到環形隊列中。rand() % 10
生成兩個隨機數x
和y
,這兩個數作為任務的操作數。task t(x, y, opers[rand() % opers.size()])
創建一個新的任務對象,假設opers
是一個操作符數組(例如,加法、減法等),從中隨機選擇一個操作符。sleep(1)
模擬任務的生成過程,表示生產一個任務需要1秒鐘時間。t.run()
執行任務(假設run()
方法執行任務操作)。rq->push(t)
將任務推送到隊列中。cout
用于輸出生產的任務信息。usleep(1)
將線程掛起1微秒,減少CPU的占用。
主函數 (main
)
int main()
{srand(time(nullptr)); // 用當前時間作為隨機數種子,確保每次運行時生成不同的隨機數pthread_t ctid, pptid, ptid; // 聲明線程IDRingqueue<task>* rq = new Ringqueue<task>(); // 創建一個環形隊列對象,用于存儲任務// 創建消費者線程pthread_create(&ctid, nullptr, cosumer, rq);// 創建兩個生產者線程pthread_create(&ptid, nullptr, productor, rq);pthread_create(&pptid, nullptr, productor, rq);// 等待所有線程完成pthread_join(ctid, nullptr);pthread_join(ptid, nullptr);pthread_join(pptid, nullptr);return 0;
}
srand(time(nullptr))
:使用當前時間作為隨機數種子,確保每次程序運行時生成不同的隨機數序列。pthread_t ctid, pptid, ptid
:聲明三個線程ID變量,分別用于消費者線程和兩個生產者線程。Ringqueue<task>* rq = new Ringqueue<task>();
:創建一個環形隊列對象,用于存儲生產者和消費者之間傳遞的任務。pthread_create(&ctid, nullptr, cosumer, rq);
:創建消費者線程,傳入環形隊列指針rq
。pthread_create(&ptid, nullptr, productor, rq);
和pthread_create(&pptid, nullptr, productor, rq);
:創建兩個生產者線程,傳入同一個環形隊列指針rq
。pthread_join
用于等待線程執行完畢。由于pthread_create
是異步執行的,所以在主線程中使用pthread_join
等待每個線程的結束,確保程