【條件變量導讀】條件變量是多線程中比較靈活而且容易出錯的線程同步手段,比如:虛假喚醒、為啥條件變量要和互斥鎖結合使用?windows和linux雙平臺下,初始化、等待條件變量的api一樣嗎?
本文將分別為您介紹條件變量在windows和linux平臺下的用法和注意事項,好!直接進入主題。
條件變量的使用場景可以用如下流程圖進行闡述。
我們需反復判斷一個多線程共享條件是否滿足,一直到該條件滿足為止(由于該條件被多個線程操作)。因此每次判斷前進行加鎖操作,判斷完畢后解鎖。但上述邏輯存在嚴重的效率問題,假設我們解鎖離開臨界區后,其他線程修改了條件,導致條件滿足了;此時程序仍然需要睡眠 n 秒后才能得到反饋。因此我們需要這樣一種機制:
某個線程 A 在條件不滿足的情況下,主動讓出互斥鎖,讓其他線程去爭奪這把鎖,當前線程A在此處等待,等待條件的滿足;一旦條件滿足,其他線程釋放鎖,并通知條件滿足,線程A就可以被立刻喚醒并能獲取到互斥鎖對象。
1、Windows下條件變量的用法
具體條件變量的定義和api,我就不介紹了,大家參考如下示例程序,就能很輕松地掌握條件變量地初始化,本文地重點是介紹條件變量地用法及注意事項。
#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <list>class ThreadTask
{public:ThreadTask(int taskId){m_taskId = taskId;}void doTask(){std::cout << " threadId: " << std::this_thread::get_id() << " do Task, taskId: " << m_taskId << std::endl;}private:int m_taskId;
};/定義全局互斥鎖對象
std::mutex myMutex;
//定義全局的windows條件變量
std::condition_variable myCv;
/全局任務隊列
std::list<ThreadTask*> taskList;void* consumeThread()
{while (true){/判全局條件(公共隊列taskList是否為空)前,先加鎖std::unique_lock<std::mutex> lk(myMutex);while (taskList.empty()){ /*如果條件不滿足,那繼續等待條件變量滿足條件同時立刻讓出剛占有的互斥鎖對象,讓其他線程去爭搶*/myCv.wait(lk); } //假設條件滿足了,當前線程將從myCv.wait(lk)返回,//并立刻獲取互斥鎖對象操作公共的全局隊列ThreadTask* pTask = taskList.front();//頭部彈任務taskList.pop_front();if (!pTask)continue;pTask->doTask();delete pTask;pTask = nullptr;}return nullptr;
}void* produceThread()
{int taskId = 0;while (true){ThreadTask* pTask = nullptr;{std::lock_guard<std::mutex> lk(myMutex);taskId++;pTask = new ThreadTask(taskId);taskList.push_back(pTask);std::cout << "thread: " << std::this_thread::get_id() << " produce a Task, taskId: " << taskId << std::endl;}/*生產完任務,通知消費線程consumeThread條件滿足釋放鎖資源myMutex*/myCv.notify_one();std::this_thread::sleep_for(std::chrono::seconds(1));}return nullptr;
}int main()
{std::thread consumeThread1(consumeThread);std::thread consumeThread2(consumeThread);std::thread consumeThread3(consumeThread);std::thread produceThread(produceThread);if (produceThread.joinable())produceThread.join();if (consumeThread1.joinable())consumeThread1.join();if (consumeThread2.joinable())consumeThread2.join();if (consumeThread3.joinable())consumeThread3.join();return 0;
}
程序運行的結果:
可以看出生產線程生產完任務塞到公共隊列中去,通知消費線程去公共隊列中取任務,一共四個線程在操作公共隊列taskList,并沒有出現資源沖突的情況。這便是條件變量使用的妙處!
從上述代碼中可以看到,條件變量竟然在等待一把互斥鎖。
std::unique_lock<std::mutex> lk(myMutex);
while (taskList.empty())myCv.wait(lk);
為啥條件變量要和互斥鎖配合一起使用?我們可以假設下面這段偽碼,互斥鎖和條件變量分開使用。
lock(myMutex)
while (taskList.empty())
{//釋放鎖unlock(myMutex);/再等待條件cvcond_wait(&cv);//再加鎖lock(myMutex)
}
假設線程當前線程(線程A)執行到第5行代碼,釋放了鎖,此時操作系統把CPU時間片分配給另外一個等待myMutex的線程B,隨后線程B釋放信號,表明條件cv已經滿足,等到線程A爭搶到CPU時間片之后,就已經錯過了線程B釋放的信號了,那么線程B將永遠阻塞在cond_wait()接口上。
解鎖和等待條件變量必須是原子性的操作,要么都成功,要么都不成功,否則就很難保證線程的同步。
還有虛假喚醒的問題,何為虛假喚醒,就是 myCv.wait(lk)接口突然返回了,但它并不是被其它線程的信號喚醒的,可能是被操作系統某個中斷信號給喚醒的,此時并沒有相應的任務需要處理,如果繼續讓線程走下去,就可能會有問題,所以為了防止這種虛假喚醒的現象,我們外部循環去判斷公共隊列是否為空,如果為空,那就繼續等待。這是Linux服務端面試必問的考點,請同學們慎重。
好,介紹完條件變量在windows下的用法,那么接著看下條件變量在linux下的用法。
2、Linux下條件變量的用法
條件變量的用法流程和windows的差不多,主要差異就是創建線程、初始化條件變量、等待條件變量的api接口不一樣。
那,直接上代碼!
#include <iostream>
#include <pthread.h>
#include <error.h>
#include <list>
#include <unistd.h>
#include <semaphore.h>
using namespace std;class ThreadTask
{public:ThreadTask(int taskId){m_taskId = taskId;}void doTask(){cout << " doTask taskId : " << m_taskId << " thread Id: " << pthread_self() << endl;}private:int m_taskId;
};pthread_mutex_t myMutex;
pthread_cond_t myCond;
list<ThreadTask*> taskList; void* consumeThread(void* param)
{while(true){pthread_mutex_lock(&myMutex);while(taskList.empty()){pthread_cond_wait(&myCond, &myMutex); }ThreadTask* pTask = taskList.front();taskList.pop_front();pthread_mutex_unlock(&myMutex);if (pTask == nullptr)continue;pTask->doTask();delete pTask;pTask = nullptr;}return NULL;
}
void* produceThread(void* param)
{int taskID = 0;ThreadTask* pTask = NULL;while (true){pTask = new ThreadTask(taskID);pthread_mutex_lock(&myMutex);taskList.push_back(pTask);std::cout << "produce a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl; pthread_mutex_unlock(&myMutex);//釋放信號量,通知消費者線程pthread_cond_signal(&myCond);taskID++;sleep(1);}return NULL;
}int main()
{pthread_mutex_init(&myMutex, NULL);pthread_cond_init(&myCond, NULL);//創建3個消費者線程pthread_t consumerThreadID[5];for (int i = 0; i < 3; ++i){pthread_create(&consumerThreadID[i], NULL, consumeThread, NULL);}//創建一個生產者線程pthread_t producerThreadID;pthread_create(&producerThreadID, NULL, produceThread, NULL);pthread_join(producerThreadID, NULL);for (int i = 0; i < 3; ++i){pthread_join(consumerThreadID[i], NULL);}pthread_cond_destroy(&myCond);pthread_mutex_destroy(&myMutex); return 0;
}
Linux平臺下運行的結果: