文章目錄
- 非并發的一寫一讀環形隊列
- 多讀多寫環形隊列
非并發的一寫一讀環形隊列
讀指針:
1、先判斷是否有數據
2、讀取數據
3、操作指針
寫指針:
1、先判斷空間是否足夠
2、寫入數據
3、操作指針·
所以代碼也十分簡單:
bool putqueue(void* pData)
{ST_NODE* ptr = NULL;do {ptr = pWrite;if ((uiQueueSize - ((pWrite + uiQueueSize - pRead) % uiQueueSize) - 1 > 0) || (NULL != *ptr)) // 隊列滿了 {return false;}} while(!_sync_bool_compare_and_swap(pWrite, ptr, ptr + 1)) // 競爭到寫指針if (pWrite >= pstBegin) {pWrite = pstBegin;}*ptr = pData;return true;
}
那么在多線程多讀多寫的情況下,該如何設計呢?
多讀多寫環形隊列
核心問題是:
1、多個線程如何競爭操作一個指針?
思路:利用CAS(compare & swap)確保只有一個線程能把指針從當前位置指向下一個位置
2、先操作指針還是先操作數據?
- 先操作指針,有可能導致數據還沒讀,就被寫入方覆蓋
- 先讀/寫數據,可能無法競爭到指針導致錯誤
- 解決方案:標記法,已讀取得數據置為NULL,未讀數據為實際數據得指針,讀寫前先判斷標記。
void* getqueue()
{ST_NODE* ptr = NULL;ST_NODE* current = NULL;do {ptr = pRead;if (((pWrite + uiQueueSize - pRead) % uiQueueSize) > 0 || (NULL != ptr)) // 隊列空{return NULL;}} while(!_sync_bool_compare_and_swap(pRead, ptr, ptr + 1)) // 競爭到讀指針if (pRead >= pstEnd) {pRead = pstEnd;}current = *ptr;*ptr = NULL;return *current;
}
此時也會出現一些極端的問題:
1、CAS指令的ABA問題:兩個線程同時讀/寫同一個位置,第一個線程獲取讀/寫權限后,第二個線程掛起。
指針有可能轉一圈回到原來位置,導致第二個線程恢復運行,從而第二個線程CAS成功。極端情況下會導致讀指針越過寫指針。
解決方案:通過一個唯一id:seq替換指針,seq為64位整數,自增且永不重復。指針 = 隊列首地址 + seq % 隊列長度
class mqueue
{
public:mqueue() {read_seq_ = write_seq_ = 0;memset(queue_arr_, 0, sizeof(queue_arr_));}bool push_back(void* data); // 插入元素void* pop_front(); // 取出元素
private:void* queue_arr_[MAXN];volatile uint64_t read_seq_;volatile uint64_t write_seq_;
};bool mqueue::push_back(void* data)
{do {uint64_t cur_seq = write_seq_; // 保留原始值,避免處理過程被其他線程改變if (cur_seq >= read_seq_ + MAXN || queue_arr_[cur_seq % MAXN]){return false; // 隊列滿了,等讀線程讀取}if (_sync_bool_compare_and_swap(&write_seq_, cur_seq, cur_seq + 1)){queue_arr_[cur_seq % MAXN] = data;return true;}} while (true);
}void* mqueue::pop_front()
{do{uint64_t cur_seq = read_seq_; // 保留原始值,避免處理過程被其他線程改變if (cur_seq >= write_seq_ || queue_arr_[cur_seq % MAXN] == NULL){return NULL; // 隊列空,等待寫線程寫入 }if (_sync_bool_compare_and_swap(&read_seq_, cur_seq, cur_seq + 1)){void* data = queue_arr_[cur_seq % MAXN];queue_arr_[cur_seq % MAXN] = NULL;return data;}} while (true);
}