文章目錄
- 九、多線程
- 8. POSIX信號量
- 根據信號量+環形隊列的生產者消費者模型代碼
- 結果演示
- 未完待續
九、多線程
8. POSIX信號量
POSIX信號量和SystemV信號量作用相同,都是用于同步操作,達到無沖突的訪問共享資源目的。 但POSIX可以用于線程間同步。
創建多線程的信號量:
銷毀多線程之間的信號量:
對信號量做P操作(申請資源):
對信號量做V操作(釋放資源):
根據信號量+環形隊列的生產者消費者模型代碼
Makefile:
cp_ring:Main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f cp_ring
Thread.hpp:
#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>namespace ThreadModule
{template<typename T>using func_t = std::function<void(T&, const std::string& name)>;template<typename T>class Thread{public:void Excute(){_func(_data, _threadname);}public:Thread(func_t<T> func, T& data, const std::string &name="none-name"): _func(func), _data(data), _threadname(name), _stop(true){}static void *threadroutine(void *args){Thread<T> *self = static_cast<Thread<T> *>(args);self->Excute();return nullptr;}bool Start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if(!n){_stop = false;return true;}else{return false;}}void Detach(){if(!_stop){pthread_detach(_tid);}}void Join(){if(!_stop){pthread_join(_tid, nullptr);}}std::string name(){return _threadname;}void Stop(){_stop = true;}~Thread() {}private:pthread_t _tid;std::string _threadname;T& _data;func_t<T> _func;bool _stop;};
}#endif
RingQueue.hpp:
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>// 環形隊列類模板
template<class T>
class RingQueue
{
private:// 申請資源void P(sem_t& sem){sem_wait(&sem);}// 釋放資源void V(sem_t& sem){sem_post(&sem);}// 加鎖void Lock(pthread_mutex_t& mutex){pthread_mutex_lock(&mutex);}// 解鎖void Unlock(pthread_mutex_t& mutex){pthread_mutex_unlock(&mutex);}
public:RingQueue(int cap):_cap(cap),_ring_queue(cap),_prodeucer_step(0),_consumer_step(0){sem_init(&_room_sem, 0, _cap);sem_init(&_data_sem, 0, 0);pthread_mutex_init(&_prodeucter_mutex, nullptr);pthread_mutex_init(&_consumer_mutex, nullptr);}// 生產者的入隊列函數void Enqueue(const T& in){// 申請空間資源P(_room_sem);// 加鎖Lock(_prodeucter_mutex);// 入隊列_ring_queue[_prodeucer_step++] = in;// 環形,繞一圈_prodeucer_step %= _cap;// 解鎖Unlock(_prodeucter_mutex);// 釋放數據資源V(_data_sem);}// 消費者的出隊列函數void Pop(T* out){// 申請數據資源P(_data_sem);// 加鎖Lock(_consumer_mutex);// 出隊列*out = _ring_queue[_consumer_step++];_consumer_step %= _cap;// 解鎖Unlock(_consumer_mutex);// 釋放空間資源V(_room_sem);}~RingQueue(){sem_destroy(&_room_sem);sem_destroy(&_data_sem);pthread_mutex_destroy(&_prodeucter_mutex);pthread_mutex_destroy(&_consumer_mutex);}
private:// 數組模擬環形隊列std::vector<T> _ring_queue;// 容量int _cap;// 生產者和消費者的位置指針int _prodeucer_step;int _consumer_step;// 信號量sem_t _room_sem;sem_t _data_sem;// 互斥鎖pthread_mutex_t _prodeucter_mutex;pthread_mutex_t _consumer_mutex;
};
Task.hpp:
#pragma once#include <iostream>
#include <functional>using Task = std::function<void()>;void Download()
{std::cout << "Downloading..." << std::endl;
}
Main.cc:
#include "RingQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>using namespace ThreadModule;
// 創建類型別名
using ringqueue_t = RingQueue<Task>;// 消費者線程
void Consumer(ringqueue_t& rq, const std::string& name)
{while (true){// 獲取任務Task t;rq.Pop(&t);std::cout << "Consumer " << name << " : ";// 執行任務t();}
}// 生產者線程
void Productor(ringqueue_t& rq, const std::string& name)
{while (true){// 發布任務rq.Enqueue(Download);std::cout << "Productor " << name << " : " << "Download task" << std::endl;sleep(1);}
}// 啟動線程
void InitComm(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq, func_t<ringqueue_t> func)
{for (int i = 0; i < num; i++){// 創建一批線程std::string name = "thread-" + std::to_string(i + 1);threads->emplace_back(func, rq, name);}
}// 創建消費者線程
void InitConsumer(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq)
{InitComm(threads, num, rq, Consumer);
}// 創建生產者線程
void InitProductor(std::vector<Thread<ringqueue_t>>* threads, int num, ringqueue_t& rq)
{InitComm(threads, num, rq, Productor);
}// 等待所有線程結束
void WaitAllThread(std::vector<Thread<ringqueue_t>>& threads)
{for (auto& thread : threads){thread.Join();}
}// 啟動所有線程
void StartAll(std::vector<Thread<ringqueue_t>>& threads)
{for (auto& thread : threads){thread.Start();}
}int main()
{// 創建阻塞隊列,容量為5ringqueue_t* rq = new ringqueue_t(10);// 創建線程std::vector<Thread<ringqueue_t>> threads;// 創建 1個消費者線程InitConsumer(&threads, 1, *rq);// 創建 1個生產者線程InitProductor(&threads, 1, *rq);// 啟動所有線程StartAll(threads);// 等待所有線程結束WaitAllThread(threads);return 0;
}
結果演示
這里演示的是單生產者單消費者的模型,可以在主函數改成多生產者多消費者的模型。