目錄
- 1. 環形隊列的概念與實現方法
- 1.1 環形隊列的概念
- 1.2 環形隊列的一般實現方法
- 2. 多線程相關的信號量概念與接口
- 2.1 信號量類型
- 2.2 信號量的初始化與銷毀
- 2.3 信號量的P申請、V釋放操作
- 3. 基于環形隊列實現p、c模型的設計方案
- 3.1 環形隊列(ringqueue)作為共享資源緩沖區的設計要求
- 3.2 環形隊列作為共享資源緩沖區的實現方法
- 4. 代碼實現
- 4.1 線程類
- 4.2 線程執行的任務
- 4.3 RingQueue類
- 4.4 主干代碼
1. 環形隊列的概念與實現方法
1.1 環形隊列的概念
- 環形隊列:
??容量固定,首尾相連的一個隊列(遵循先進先出),于邏輯上形狀為一個環形。當存儲數據超過容量上限后,再次給環形隊列插入元素時,不會產生越界的情況。新的數據會回繞到數據首部進行插入,將空間中的舊數據覆蓋。
1.2 環形隊列的一般實現方法
??最常用于實現環形隊列的數據結構為順序表,其通過對下標的取模操作,就可以很好的實現存儲空間邏輯上的環形要求,具體實現方案如下:
方案1:
-
開辟要求容量
cap
大小的順序表 -
以變量
begin
記錄隊列中首個數據的下標,起始設置為0 -
以變量
end
記錄隊列中新數據插入位置的下標,起始設置為0
-
以變量
count
記錄隊列中存儲數據的個數,每次插入數據后進行++
,每次刪除數據后進行--
。隊列為空與為滿時,begin
與end
的位置相同,因此,需要以count
中記錄的值來做區分 -
插入元素時,向
end
記錄的下標位置寫入數據,然后再++
-
刪除元素時,只需讓
begin
進行++
即可,begin
與end
下標之間的空間中存儲著隊列中的數據 -
begin
與end
每次++
后都要對其進行取模操作%= cap
方案2:
??方案2的大體實現思路與方案1相同,只是于隊列空滿判定實現上有所區別。此種方法,會將隊列額外多開辟出一塊空間,隊列容量為cap
,實際大小為cap + 1
。此塊多出來的空間不用來存放數據,而是用來標識環形隊列的滿狀態(end + 1) % (cap + 1) == begin
。
2. 多線程相關的信號量概念與接口
2.1 信號量類型
#include <semaphore.h>
//原生線程庫中的內容,編譯時帶-lpthread選項
sem_t sem;//同樣為計數器
2.2 信號量的初始化與銷毀
信號量的初始化:
int sem_init(sem_t *sem, int pshared, unsigned int value);
- 參數1
sem_t*
: 傳入需要初始化的信號量地址 - 參數2
int pshared
: 用于父子進程之間共享設置為1
,用于多線程之間共享設置為0
- 參數3
unsigned int value
: 用于設置信號量的大小 - 返回值
int
: 成功時,返回0;失敗時,返回-1,并將錯誤碼寫入errno
信號量的銷毀:
int sem_destroy(sem_t* sem);
- 參數
sem_t* sem
: 傳入需要銷毀的信號量的地址 - 返回值
int
: 成功時,返回0;失敗時,返回-1,并將錯誤碼寫入errno
2.3 信號量的P申請、V釋放操作
信號量的P操作:
int sem_wait(sem_t* sem);
- 參數
sem_t* sem
: 傳入需要進行P操作的信號量地址 - 返回值
int
: 成功時,返回0;失敗時,返回-1,并將錯誤碼寫入errno
信號量的V操作:
int sem_post(sem_t* sem);
- 參數
sem_t* sem
: 傳入需要進行V操作的信號量地址 - 返回值
int
: 成功時,返回0;失敗時,返回-1,并將錯誤碼寫入errno
??信號量不僅僅其申請與釋放操作都是互斥的,而且本身還同時具有計數的作用。因此,在功能上,其就相當于是互斥鎖與條件變量的結合,但其又只需要一條操作語句就可以實現互斥鎖與條件變量的配合效果。所以,在代碼編寫角度,信號量比互斥鎖 + 條件變量
更簡單也更簡潔。
3. 基于環形隊列實現p、c模型的設計方案
3.1 環形隊列(ringqueue)作為共享資源緩沖區的設計要求
??生產者生產數據存入隊列,消費者消費數據從隊列中獲取,此處使用方案1中的環形隊列設計方式。
- 生產者與消費者訪問共享資源時,需要保持互斥且同步。
- 環形隊列中有多塊空間,只有隊列為滿或為空時,生產者與消費者才會訪問到同一塊空間,即訪問共享資源。因此,除開隊列為空為滿的情況之外,其他場景中,消費者與生產者都可以并發地訪問環形隊列。
3.2 環形隊列作為共享資源緩沖區的實現方法
- 對于生產者而言,其需要向隊列中插入數據,所以,隊列中空余的空間(room)是其所需的資源
- 對于消費者而言,其需要從隊列中獲取數據,所以,隊列空間中存儲的數據(data)是其所需的資源
??定義兩個信號量room_sem
與data_sem
,分別作為生產者資源與消費者資源的計數器,每次在生產者、消費者線程訪問環形隊列之前,都要預先申請對應的信號量資源。變量productor_index
記錄生產者生產資源的放置位置,變量consumer_index
記錄消費者從環形隊列中獲取數據的位置。生產者之間、消費者之間它們對于環形隊列的訪問需要保持互斥性,此處實現可以使用一把鎖來實現,但這樣生產者、消費者之間大多數情況下的并發就會被影響,因此,我們定義分別定義兩把鎖,來分別實現同類之間的互斥,彼此之間并發。
- 環形隊列作為共享資源緩沖區的優勢
??隊列中擁有多塊空間,生產者與消費者線程大多數情況下都不會訪問同一塊資源,因此,可以保證它們之間的并發訪問,提高程序的效率。
4. 代碼實現
4.1 線程類
namespace ThreadModule
{template<typename T>using func_t = function<void(T&, string)>;template<typename T>class Thread{public:Thread(func_t<T> func, T& data, string name = "none-thread"):_func(func), _data(data), _name(name), _stop(true){}~Thread(){}void Execute(){_func(_data, _name);}static void* threadroutine(void* arg){Thread<T>* ptd = static_cast<Thread<T>*>(arg);ptd->Execute();return nullptr;}bool start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if(n){return false; }_stop = false;return true;}void join(){if(!_stop){pthread_join(_tid, nullptr);}}void detach(){if(!_stop){pthread_detach(_tid);}}string name(){return _name;}void stop(){_stop = true;}private:pthread_t _tid;string _name;func_t<T> _func;T& _data;bool _stop;};}
4.2 線程執行的任務
using task_t = function<void()>;void download()
{cout << "task is download" << endl;
}
4.3 RingQueue類
#ifndef RING_QUEUE
#define RING_QUEUE
#include <vector>
#include <semaphore.h>
#include <pthread.h>template<typename T>
class RingQueue
{
private:void P(sem_t* sem){sem_wait(sem);//wait申請}void V(sem_t* sem){sem_post(sem);//post單詞譯為放入,post操作為釋放}void Lock(pthread_mutex_t* lock){pthread_mutex_lock(lock);}void Unlock(pthread_mutex_t* lock){pthread_mutex_unlock(lock);}public:RingQueue(int cap):_cap(cap), _ringbuffer(cap), _productor_index(0), _consumer_index(0){sem_init(&_room_sem, 0, _cap);//父子進程之間共享pshared: 1, 多線程之間共享pshared: 0sem_init(&_data_sem, 0, 0);pthread_mutex_init(&_productor_lock, nullptr);pthread_mutex_init(&_consumer_lock, nullptr);}void Enqueue(const T& data){P(&_room_sem);Lock(&_productor_lock);_ringbuffer[_productor_index++] = data;_productor_index %= _cap;Unlock(&_productor_lock);V(&_data_sem);}void Pop(T* data){P(&_data_sem);Lock(&_consumer_lock);*data = _ringbuffer[_consumer_index++];_consumer_index %= _cap;Unlock(&_consumer_lock);V(&_room_sem);}~RingQueue(){sem_destroy(&_room_sem);sem_destroy(&_data_sem);pthread_mutex_destroy(&_productor_lock);pthread_mutex_destroy(&_consumer_lock);}private:vector<T> _ringbuffer;int _cap;int _productor_index;int _consumer_index;sem_t _room_sem;sem_t _data_sem;pthread_mutex_t _productor_lock;pthread_mutex_t _consumer_lock;
};#endif
4.4 主干代碼
using ringqueue_t = RingQueue<task_t>;void ProductorRun(ringqueue_t& rq, string name)
{while(true){rq.Enqueue(download);cout << name << " is producted" << endl;}
}void ConsumerRun(ringqueue_t& rq, string name)
{while(true){sleep(1);task_t task;rq.Pop(&task);cout << name << " get : "; task();//bad_function_call: 嘗試調用沒有目標的function對象時會出現此異常}
}void InitComm(vector<Thread<ringqueue_t>>& threads, ringqueue_t& rq, int num, func_t<ringqueue_t> func, string who)
{for(int i = 0; i < num; i++){string name = who + '-' + to_string(i + 1);threads.emplace_back(func, rq, name);}
}void InitProductor(vector<Thread<ringqueue_t>>& threads, ringqueue_t& rq, int num)
{InitComm(threads, rq, num, ProductorRun, "productor");
}void InitConsumer(vector<Thread<ringqueue_t>>& threads, ringqueue_t& rq, int num)
{InitComm(threads, rq, num, ConsumerRun, "consumer");
}void StartAll(vector<Thread<ringqueue_t>>& threads)//vector必須用引用,存儲線程tid
{for(auto& thread : threads){thread.start();}
}void WaitAllThread(vector<Thread<ringqueue_t>> threads)//嘗試用拷貝對象是否能夠正常回收進程
{for(auto thread : threads){thread.join();}
}int main()
{ringqueue_t rq(5);vector<Thread<ringqueue_t>> threads;InitProductor(threads, rq, 3);InitConsumer(threads, rq, 2);StartAll(threads);WaitAllThread(threads);return 0;
}
??此處設計時,將創建線程類與真正創建并啟動線程分開執行。這是因為直接以threads.back().start()
創建后調用啟動線程,其中threads.back()
迭代器可能會由于不斷向threads
中插入新的線程類對象,導致發生擴容,最終使得原迭代器失效。