Linux —— 信號量
- 什么是信號量
- P操作(Wait操作)
- V操作(Signal操作)
- 信號量的類型
- 一些接口
- POSIX 信號量接口:
- 其他相關命令:
- 基于循環隊列的生產者和消費者模型
- 同步關系
- 多生產多消費
我們今天接著來學習信號量:
什么是信號量
信號量(Semaphore)是一種用于操作系統中管理共享資源訪問和同步的機制。它是一種特殊的數據結構,用來控制多個進程或線程對公共資源的訪問,以防止多個進程同時對同一資源進行訪問而導致的沖突問題。信號量維護了一個計數器,該計數器可以增加(通常稱為V操作或Signal操作)或減少(稱為P操作或Wait操作),并且這些操作都是原子的,即不可中斷。
P操作(Wait操作)
- 當一個進程想要訪問一個受保護的資源時,它會執行P操作。
- P操作會檢查信號量的值,如果信號量大于0,則減1,并允許進程繼續執行。
- 如果信號量等于0,表示資源已被占用,進程將被阻塞(等待)直到信號量的值變為非零。
V操作(Signal操作)
- 當一個進程完成對資源的訪問后,它會執行V操作。
- V操作會將信號量的值加1,表示釋放了一個資源。
- 如果有其他進程因為之前P操作而等待,此時可能會喚醒其中一個等待的進程。
信號量的類型
- 二值信號量:這種信號量只有0和1兩種狀態,
相當于一個互斥鎖,用于實現互斥訪問
。- 計數信號量:
可以用于控制有限數量的相同資源的訪問
,計數器的初始值代表資源的數量。
簡單來說,信號量的本質就是一個容量為N的鎖,跟一般的鎖不一樣,它可以放個多個線程訪問臨界資源,但是達到上限就不會讓線程進入了,而讓他們阻塞等待。
一些接口
在Linux環境下,信號量作為進程間通信的一種手段,主要用于同步和互斥控制。以下是Linux下信號量的一些常用接口,主要涉及System V信號量和POSIX信號量兩種類型:
POSIX 信號量接口:
- sem_init(sem_t *sem, int pshared, unsigned int value):
- 初始化一個POSIX信號量。
sem
是信號量的地址,pshared
指定信號量是否可以在多進程中共享(如果為1則是共享的),value
是信號量的初始值。
- sem_wait(sem_t *sem):[
- 執行P操作,如果信號量的值大于0,則減1并繼續執行;否則,進程將被阻塞直到信號量的值大于0。
- sem_post(sem_t *sem):
- 執行V操作,增加信號量的值,如果因此喚醒了等待的進程,則會選擇一個進行喚醒。
- sem_destroy(sem_t *sem):
- 銷毀一個POSIX信號量。
- sem_getvalue(sem_t *sem, int *sval):
- 獲取POSIX信號量的當前值,
sval
是存儲信號量值的指針。
其他相關命令:
- ipcs: 查看系統中所有的IPC設施狀態,包括消息隊列、共享內存段和信號量。
- ipcrm: 刪除指定的IPC設施,比如信號量集。
基于循環隊列的生產者和消費者模型
我們這里模擬實現一個基于循環隊列的生產者和消費者模型,首先我們實現一個循環隊列:
先把架子搭好:
#include<iostream>
#include<semaphore>
#include<pthread.h>
#include<cstring>
#include<vector>
#include<semaphore.h>
#include<unistd.h>template<class T>
class CircleQueue
{
public:CircleQueue():_size(10),_product_start(0),_consum_start(0){}~CircleQueue(){}private:
};
現在,我們要分析一下,這里面的同步關系:
同步關系
我們這里清楚,生產者會消費一個空間生產一個產品,并往前走一步,并且生產者和消費者是指向一個空間:
消費者會消費一個產品,騰出一個空間資源,然后往前走一步:
這個時候,我們可以分析出以下幾條信息:
- 生產者一定先跑,因為一開始有空間資源,沒有產品資源。
- 消費者一定比生產者跑的慢,因為消費者的速度是受生產者產出產品的速度決定的。
- 當空間資源被用完時,生產者停止生產,讓消費者消費之后,騰出空間資源之后再繼續生產。相反,如果沒有產品資源,消費者阻塞,讓生產者產出產品之后,再消費產品。
#include <iostream>
#include <semaphore>
#include <pthread.h>
#include <cstring>
#include <vector>
#include <semaphore.h>
#include <unistd.h>// 定義一個泛型循環隊列類,利用信號量實現線程安全的生產者消費者模型
template <class T>
class CircleQueue {
public:// 默認構造函數,初始化一個大小為10的循環隊列CircleQueue(): _size(10), // 隊列默認容量_product_start(0), // 生產者開始位置_consum_start(0) // 消費者開始位置{// 初始化空間信號量,初始值為隊列大小,表示可用空間數量sem_init(&_sem_space, 0, _size);// 初始化數據信號量,初始值為0,表示當前沒有可消費的數據sem_init(&_sem_data, 0, 0);// 初始化隊列容器_queue.resize(_size);}// 帶參數構造函數,允許用戶自定義隊列大小CircleQueue(int size): _size(size), // 用戶指定的隊列容量_product_start(0),_consum_start(0){sem_init(&_sem_space, 0, size); // 根據用戶指定的大小初始化空間信號量sem_init(&_sem_data, 0, 0);_queue.resize(size);}// 析構函數,釋放信號量資源~CircleQueue() {sem_destroy(&_sem_space);sem_destroy(&_sem_data);}// 生產者方法,向隊列中添加數據void Push(const T& data) {// 在嘗試放入數據前,先等待確保有空閑空間sem_wait(&_sem_space);// 將數據放入隊列的下一個生產位置_queue[_product_start] = data;// 更新生產者位置,并對索引取模以實現循環_product_start = (_product_start + 1) % _size;// 數據放入后,釋放數據信號量,通知消費者有新數據可取sem_post(&_sem_data);}// 消費者方法,從隊列中取出數據void Pop(T* out) {// 等待直到有數據可消費sem_wait(&_sem_data);// 從隊列的下一個消費位置取出數據*out = _queue[_consum_start];// 更新消費者位置,并對索引取模實現循環_consum_start = (_consum_start + 1) % _size;// 數據取出后,釋放空間信號量,表明隊列中有更多空間可填充sem_post(&_sem_space);}private:// 循環隊列的底層數據結構std::vector<T> _queue;int _size; // 隊列的最大容量// 生產者和消費者的當前位置索引int _product_start;int _consum_start;// 信號量用于同步控制sem_t _sem_space; // 控制隊列中的空閑空間sem_t _sem_data; // 控制隊列中的有效數據量
};
這段代碼實現了一個基于信號量的線程安全循環隊列模板類。它支持生產者線程向隊列中添加元素(通過Push
方法),同時允許消費者線程從隊列中取出元素(通過Pop
方法)。通過使用兩個信號量——_sem_space
和 _sem_data
——分別管理隊列的可用空間和有效數據量,確保了多線程環境下的正確同步與互斥。
基于這個,我們實現一下整體的代碼:
#include"CircleQueue.hpp"
#include<time.h>void* product(void* args)
{CircleQueue<int>* cq = static_cast<CircleQueue<int>*>(args);while(true){//生產數據int randomdata = rand() % 10 + 1;cq->Push(randomdata);std::cout << "Producter has product a number: " << randomdata <<std::endl; }return nullptr;
}void* consum(void* args)
{CircleQueue<int>* cq = static_cast<CircleQueue<int>*>(args);while(true){//拿出數據int outnumber = 0;cq->Pop(&outnumber);std::cout << "Consum gets a number: " << outnumber <<std::endl; sleep(1);}return nullptr;
}int main()
{srand(time(0));//創建線程pthread_t tid_product,tid_consum;CircleQueue<int>*cq = new CircleQueue<int>();//生產者pthread_create(&tid_product,nullptr,product,cq);//消費者pthread_create(&tid_consum,nullptr,consum,cq);pthread_join(tid_product,nullptr);pthread_join(tid_consum,nullptr);
}
大家可以調整一下生產者或者消費者的速度,看看情況怎么樣。
多生產多消費
這里注意一下,這里和互斥鎖的情況有點不一樣:
我們用一個通俗的例子來解釋:
假設有一天,你和你的好朋友(一共8個人),想去電影院看電影:
你們到售票機哪里去買票,此時電影院的座位很充足,所以你們都買到票了。
但是一看座位號,發現大家全都是1號座位
這就很尷尬了,這個場景可以類比到我們上面的代碼中,8個線程通過了信號量,但是都在往一個位置位置放東西,這樣不行,所以我們得出位置是每個人獨有的,一人一份,如果自己擁有,別人就不能擁有,所以為了保證每一個位置為一人獨有所以我們要給每個位置上鎖(電影院的座位)
如果有點繞,咋們來復盤一下:
- 電影院有很多位置,所以,我們多人可以都得到屬于自己的位置(類比我們的循環隊列)
- 為了保證我們的位置是獨一無二屬于自己,我們要給自己的位置上鎖,保證只有自己可以坐這個座位。(類比循環隊列中的下標)
解決完上面的問題,我們現在要做的,就是還要兩把鎖,一個保證生產的時候,放入時候的位子只屬于一個生產者進程,另一把鎖保證從一個位置里面拿產品的時候只屬于一個消費者進程:
#include <iostream>
#include <semaphore>
#include <pthread.h>
#include <cstring>
#include <vector>
#include <semaphore.h>
#include <unistd.h>// 定義一個泛型循環隊列類,結合信號量與互斥鎖實現線程安全的生產者消費者模型
template <class T>
class CircleQueue {
public:// 默認構造函數,初始化一個大小為10的循環隊列,并初始化信號量與互斥鎖CircleQueue(): _size(10), // 隊列默認容量_product_start(0), // 生產者開始位置_consum_start(0) // 消費者開始位置{sem_init(&_sem_space, 0, _size); // 初始化空間信號量,初始值為隊列大小sem_init(&_sem_data, 0, 0); // 初始化數據信號量,初始值為0pthread_mutex_init(&_p_mutex, nullptr); // 初始化生產者互斥鎖pthread_mutex_init(&_c_mutex, nullptr); // 初始化消費者互斥鎖_queue.resize(_size); // 初始化隊列向量}// 帶參數構造函數,允許自定義隊列大小CircleQueue(int size): _size(size),_product_start(0),_consum_start(0){sem_init(&_sem_space, 0, size);sem_init(&_sem_data, 0, 0);pthread_mutex_init(&_p_mutex, nullptr);pthread_mutex_init(&_c_mutex, nullptr);_queue.resize(size);}// 析構函數,釋放信號量與互斥鎖資源~CircleQueue() {sem_destroy(&_sem_space);sem_destroy(&_sem_data);pthread_mutex_destroy(&_p_mutex);pthread_mutex_destroy(&_c_mutex);}// 生產者方法,向隊列中添加數據void Push(const T& data) {// 確保有足夠的空間再進行生產sem_wait(&_sem_space);// 使用互斥鎖保護生產過程,防止與其它生產者并發沖突pthread_mutex_lock(&_p_mutex);_queue[_product_start] = data; // 添加數據_product_start = (_product_start + 1) % _size; // 更新生產者位置pthread_mutex_unlock(&_p_mutex); // 釋放鎖sem_post(&_sem_data); // 數據添加完畢,釋放數據信號量}// 消費者方法,從隊列中取出數據void Pop(T* out) {// 確保有數據可消費sem_wait(&_sem_data);// 使用互斥鎖保護消費過程pthread_mutex_lock(&_c_mutex);*out = _queue[_consum_start]; // 取出數據_consum_start = (_consum_start + 1) % _size; // 更新消費者位置pthread_mutex_unlock(&_c_mutex); // 釋放鎖sem_post(&_sem_space); // 釋放空間信號量}private:// 循環隊列的底層數據結構std::vector<T> _queue;int _size; // 隊列的最大容量// 生產者和消費者的當前位置索引int _product_start;int _consum_start;// 同步控制工具sem_t _sem_space; // 控制隊列中的空閑空間sem_t _sem_data; // 控制隊列中的有效數據量// 互斥鎖用于保護隊列訪問的原子性pthread_mutex_t _p_mutex; // 生產者使用的互斥鎖pthread_mutex_t _c_mutex; // 消費者使用的互斥鎖
};
這里打印的時候,由于屏幕也是公共資源,我這里加鎖,是保證打印的時候,只有生產者或者消費者打印信息:
#include"CircleQueue.hpp"
#include<time.h>pthread_mutex_t global_mutex = PTHREAD_MUTEX_INITIALIZER;void* product(void* args)
{CircleQueue<int>* cq = static_cast<CircleQueue<int>*>(args);while(true){//生產數據int randomdata = rand() % 10 + 1;cq->Push(randomdata);pthread_mutex_lock(&global_mutex);std::cout << "Producter has product a number: " << randomdata <<std::endl; pthread_mutex_unlock(&global_mutex);}return nullptr;
}void* consum(void* args)
{CircleQueue<int>* cq = static_cast<CircleQueue<int>*>(args);while(true){//拿出數據int outnumber = 0;cq->Pop(&outnumber);pthread_mutex_lock(&global_mutex);std::cout << "Consum gets a number: " << outnumber <<std::endl; pthread_mutex_unlock(&global_mutex);sleep(1);}return nullptr;
}int main()
{srand(time(0));//創建線程pthread_t tid_product[8],tid_consum[8];CircleQueue<int>*cq = new CircleQueue<int>();//生產者for(int i = 0; i < 8; i++){pthread_create(&tid_product[i],nullptr,product,cq);}//消費者for(int i = 0; i < 8; i++){pthread_create(&tid_consum[i],nullptr,consum,cq);}for(int i = 0; i < 8; i++){pthread_join(tid_product[i],nullptr);}for(int i = 0; i < 8; i++){pthread_join(tid_consum[i],nullptr);}}