線程
文章目錄
- 線程
- I 線程基本概念
- 1、為什么引入線程
- 2、Pthreads
- II 線程基本操作
- 1、創建線程
- 2、終止線程
- 3、線程ID
- 4、連接已終止線程
- 5、線程基本操作示例
- III 通過互斥量同步線程
- 1、基本概念
- 2、互斥量(Mutex)
- 3、靜態分配互斥量
- 4、互斥量鎖定與解鎖
- 5、互斥量的死鎖
- 6、互斥量類型
- 7、動態初始化互斥量
- IV 通過條件變量同步線程
- 1、條件變量
- 2、靜態分配的條件變量
- 3、初始化動態分配的條件變量
- 4、通知和等待條件變量
- 5、示例:生產者-消費者模型
I 線程基本概念
線程是允許應用程序并發執行多個任務的一種機制;一個進程可以包含多個線程,統一程序中的線程會獨立執行相同的程序,且共享同一份全局內存區域。
1、為什么引入線程
- 進程間不共享內存,進程間的信息交換需要通過IPC(進程間通信)機制來實現;同進程的線程之間共享內存空間(包括堆、全局變量等),可直接通過讀寫共享內存來實現線程間高效通信
- fork創建進程代價較高,涉及完整的地址空間復制、文件描述符表復制等資源開銷;線程創建僅需少量寄存器設置和棧空間分配,共享進程資源
- 線程切換開銷遠小于進程切換,因無需切換地址空間和刷新TLB
- 多線程程序能更好利用多核CPU資源,實現真正的并行計算
- 線程間通信延遲更低,適合需要頻繁數據交互的場景
2、Pthreads
Posix統一了Pthreads線程接口的標準,提供了一套跨平臺的線程創建、同步和管理API。
Pthread常用數據類型:
II 線程基本操作
1、創建線程
程序啟動運行時只有一條主線程(main
函數),可以使用pthread_creat()
函數來創建新的子線程:
#include <pthreads.h>int pthread_create(pthread_t *thread, const pthread_attr_t *attr,void *(*start)(void *), void *arg);/*return 0 on success,or a positive errno on error*/
- 該函數通過調用帶有參數
arg
的函數start
開始執行,新建的線程執行start
函數里面的內容,調用pthread_create()
的線程會繼續執行后面的內容。
- 參數
arg
為void *
類型,可以將指向任意對象的指針傳遞給start
函數,需要傳遞多個參數時,可以將arg
指向一個結構體
- 參數
thread
指向pthread_t
類型的緩沖區,在pthread_creat()
返回之前,會在此保存該線程的唯一標識(ID),后續的pthread函數可以通過該表示來引用此線程
- 參數attr指定了新創建線程的各種屬性,設置為NULL,那么創建的線程使用各種默認屬性
2、終止線程
終止線程有很多方式:
- 線程函數
start
執行return
語句并返回值之后線程終止
- 調用
pthread_exit()
和pthread_cancle()
函數取消線程
- 任意線程調用了
exit()
,或者主線程執行了return
語句
pthread_exit終止線程:
#include <pthread.h>void pthread_exit(void *retval);
- 可以在線程的任何地方調用
pthread_exit()
來退出線程,與return
功能相似
- 參數
retval
中保存了線程的返回值,其所指向的內容不應該分配在線程棧中
- 主線程調用
pthread_exit()
后,其他線程還會繼續執行
3、線程ID
進程內部的每個線程都有唯一的標識,稱為線程ID。在Linux中:
- 線程ID在其所屬進程內唯一標識一個線程
- 不同進程中的線程可能具有相同的線程ID(由不同進程的線程ID命名空間隔離)
- 系統范圍內唯一的線程標識可通過pthread_self()結合進程ID實現
獲取自己線程的ID:
#include <pthread.h>pthread_t pthread_self(void);
檢查兩個線程的ID是否相同:
#include <pthread.h>int pthread_equal(pthread_t t1, pthread_t t2);/*return nonezero value if t1 equal to t2, otherwise 0*/
4、連接已終止線程
- 函數
pthread_join()
等待由thread
標識的線程終止,如果線程已經終止則會立即返回
#include <pthread.h>int pthread_join(pthread_t thread, void **retval);/*return 0 on success or a positive error number on error*/
- 參數
thread
指定了需要連接的線程
retval
為一非空指針,用于保存線程終止時返回值的拷貝(即線程return
值或調用pthread_exit()
所指定的值)
- 該函數的功能與進程的
waitpid()
類似,但線程之間的關系是對等的,進程中的任意線程均可以通過該函數與進程中的任意線程建立連接
5、線程基本操作示例
/* Thread handler function for pthread_create */void* handle_create(void *arg){pthread_t tid = pthread_self();printf("I am thread %ld\n",(long int)tid);/* Check if argument is NULL */char *str = (char*)arg;if(str == NULL){printf("nothing recived form str\n");pthread_exit((void*)-1);}/* Print received message */printf("message form pthread_create:\n%s\n",str);return (void*)0;}/* Main function to demonstrate thread creation and joining */int thread_pthread_func(int argc, char *argv[]){/* Check command line arguments */if(argc < 2){printf("Usage:%s <message to thread>\n", argv[0]);return -1;}/* Handle NULL argument case */char *str = (strcmp(argv[1],"NULL") == 0) ? NULL : argv[1]; /* Create new thread */pthread_t pth;int err = pthread_create(&pth, NULL, handle_create, str);if(err != 0){printf("pthread_create error:%s\n", strerror(err));return -1;}/* Wait for thread to complete and get return value */void *ret = NULL ;int join = pthread_join(pth, &ret);if(join != 0){printf("pthread_join error: %s\n", strerror(join));return -1;}printf("thread %ld exit status: %ld\n", (long int)pth, (intptr_t)ret);return 0;}
III 通過互斥量同步線程
1、基本概念
- 若多個線程共享同一資源(文件、變量、內存塊等),當線程1需要讀取這一資源的時候,正好線程2修改了這一資源的值,這時線程1讀取到的資源值已被修改,可能不是預期的值,這可能會帶來無法預期的結果
- 當多個線程共享相同的資源時,需要確保它們訪問這些資源時不會產生沖突或不一致的結果。線程同步就是協調多個線程的執行順序,以確保數據的一致性和正確性。
- 臨界區是指訪問同一共享資源的代碼片段,并且這段代碼的執行應為原子操作
2、互斥量(Mutex)
- 為避免線程更新共享變量時出現問題,可通過互斥量來確保同一時刻僅有一個線程可以訪問這個共享變量
- 一個互斥量有兩種狀態,鎖定狀態和解鎖狀態,任何時候只有一個線程可以鎖定同一互斥量,若有線程試圖鎖定已鎖定的互斥量,該線程將阻塞,直到該互斥量變為解鎖狀態
- 一旦線程鎖定了某個互斥量,這個線程將成為該互斥量的所有者,只有所有者才能給鎖定狀態的互斥量解鎖
- 線程通過訪問互斥量保護的共享資源時,遵循以下流程:
????針對共享資源鎖定互斥量 --> 訪問共享資源 --> 解鎖互斥量
3、靜態分配互斥量
- 互斥量是
pthread_mutex_t
類型的變量,使用前必須對其進行初始化,初始化后的互斥量處于解鎖狀態
pthrea_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
4、互斥量鎖定與解鎖
- 可通過以下函數對互斥量進行鎖定和解鎖,參數
mutex
是需要解鎖或鎖定的互斥量
#include <pthread.h>int pthread_mutex_lock(pthread_mutex_t *mutex);int pthread_mutex_unlock(pthread_mutex_t *mutex);/*return 0 on success or a positive error number on error*/
- 通過
pthread_mutex_lock()
鎖定互斥量時,若互斥量處于未鎖定狀態,則函數鎖定該互斥量并立即返回;若互斥量處于鎖定狀態,則一直阻塞,當互斥量解鎖時鎖定互斥量并返回
- 通過
pthread_mutex_unlock()
解鎖互斥量時,不能對處于未鎖定狀態的互斥量進行解鎖,不能解鎖其他線程鎖定的互斥量
- 示例:多線程修改互斥量保護的全局變量
/* Shared data protected by mutex */static long int data_mutex;/* Mutex for protecting data_mutex */static pthread_mutex_t mtx_lock_unlock = PTHREAD_MUTEX_INITIALIZER;/* Thread function that increments shared data with mutex protection */void *handler_mutex_lock_unlock(void *num){/* Convert argument to loop count */int loop = atoi((char*)num);int ret, tmp;/* Loop to increment shared data */for(int i = 0; i < loop; i++){/* Lock mutex before accessing shared data */ret = pthread_mutex_lock(&mtx_lock_unlock);if(ret != 0){printf("pthread_mutex_lock error :%s\n",strerror(ret));return (void*)-1;}/* Critical section: increment shared data */tmp = data_mutex;tmp++;data_mutex = tmp;/* Unlock mutex after accessing shared data */ret = pthread_mutex_unlock(&mtx_lock_unlock);if(ret != 0){printf("pthread_mutex_unlock error:%s\n", strerror(ret));return (void*)-1;}}return (void*)0;}/* Function to create multiple threads that increment shared data */int main(int argc, char *argv[]){/* Check command line arguments */if(argc < 2 || atoi(argv[1]) < 0){printf("Usage:%s <loop times>", argv[0]);return -1;}/* Create 5 threads */pthread_t thr[5];int errn;for(int i = 0; i < 5; i++){errn = pthread_create(&thr[i], NULL, handler_mutex_lock_unlock, (void*)argv[1]);if(errn != 0){printf("pthread_create error:%s\n", strerror(errn));return -1;}}/* Wait for all threads to complete */void *ret = NULL;for(int j = 0; j < 5; j++){if(pthread_join(thr[j], &ret) != 0){printf("pthread_join error\n");return -1;}if((intptr_t)ret != 0)printf("thread %ld exit error\n", (long int)thr[j]);}/* Print final value of shared data */printf("the value of \"data_mutex\" is %ld \n", data_mutex);return 0;}
5、互斥量的死鎖
- 多個線程在爭奪資源時,因互相等待對方釋放資源而陷入無限阻塞的狀態,例如:
- 有兩個互斥量A和B,兩個線程1和2,線程1鎖定了A之后欲鎖定B,但線程2要在鎖定A之后才能解鎖B,線程2鎖定了B之后欲鎖定A,但線程1要在鎖定B之后才能解鎖A,這樣就會陷入無限阻塞狀態
- 為避免死鎖,可以采取以下策略:
??(1)固定加鎖順序:所有線程按照相同的順序獲取鎖
??(2)使用嘗試加鎖:pthread_mutex_trylock()
避免阻塞
??(3)設置超時機制:pthread_mutex_timedlock()
限制等待時間
??(4)避免嵌套鎖:盡量減少同時持有多個鎖的情況
??(5)使用鎖層次結構:為鎖定義層次關系,只允許按層次獲取
6、互斥量類型
- POSIX標準定義了4種互斥量類型:
??(1)PTHREAD_MUTEX_NORMAL
:標準互斥量,不檢測死鎖和重復加鎖
??(2)PTHREAD_MUTEX_ERRORCHECK
:錯誤檢查互斥量,檢測死鎖和重復加鎖并返回錯誤
??(3)PTHREAD_MUTEX_RECURSIVE
:遞歸互斥量,允許同一線程多次加鎖
??(4)PTHREAD_MUTEX_DEFAULT
:默認類型,通常映射為NORMAL或ERRORCHECK
- 通過
pthread_mutexattr_settype()
設置:
#include <pthread.h>int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int type);/*return 0 on success or a positive error number on error*/
- 示例:
pthread_mutex_t mutex;pthread_mutexattr_t attr;pthread_mutexattr_init(&attr);pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);pthread_mutex_init(&mutex, &attr);pthread_mutexattr_destroy(&attr);
7、動態初始化互斥量
PTHREAD_MUTEX_INITIALIZER
只能用于對靜態分配且擁有默認屬性的互斥量進行初始化
- 對于在堆中或者棧中分配的互斥量必須進行動態初始化,使用后必須手動銷毀
#include <pthread.h>int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);int pthread_mutex_destory(pthread_mutex_t *mutex);/*return 0 on success or a positive error number on error*/
- 參數
mutex
是需要初始化或者銷毀的互斥量,attr
用于定義互斥量的屬性,為NULL表示默認屬性
- 示例:動態初始化互斥鎖
/*mutex to protect shared data*/static pthread_mutex_t mtx_mutex_init;/* Shared data protected by mutex */static long int data_mutex_init = 0;void *handler_mutex_init(){// Lock mutex before accessing shared dataint ret = pthread_mutex_lock(&mtx_mutex_init);if(ret != 0){printf("pthread_lock error :%s\n", strerror(ret));return (void*)-1;}// Critical section: increment shared datalong int tmp = data_mutex_init;for(int i = 0; i < 100000; i++)tmp ++;data_mutex_init = tmp;// Unlock mutex after accessing shared dataret = pthread_mutex_unlock(&mtx_mutex_init);if(ret != 0){printf("pthread_unlock error :%s\n", strerror(ret));return (void*)-1;}return (void*)0;}int thread_mutex_init(){// Initialize mutexint ret = pthread_mutex_init(&mtx_mutex_init, NULL);if(ret != 0){printf("pthread_mutex_init error: %s\n", strerror(ret));return -1;}// Create first threadpthread_t pth_1, pth_2;ret = pthread_create(&pth_1, NULL, handler_mutex_init, NULL);if(ret != 0){printf("pthread_create error:%s\n", strerror(ret));return -1;}// Create second threadret = pthread_create(&pth_2, NULL, handler_mutex_init, NULL);if(ret != 0){printf("pthread_create error:%s\n", strerror(ret));goto clean_up;}// Wait for threads to complete and check their statusvoid *err;if(pthread_join(pth_1, &err) != 0){printf("pthread_join error\n");goto clean_up;}if((intptr_t)err != 0)printf("thread %ld exit error\n", (long int)err);if(pthread_join(pth_2, &err) != 0){printf("pthread_join error\n");goto clean_up;}if((intptr_t)err != 0)printf("thread %ld exit error\n", (long int)err);clean_up:// Clean up mutex resourcesif(pthread_mutex_destroy(&mtx_mutex_init) != 0){printf("pthread_mutes_destory error\n");return -1;}// Print final value of shared dataprintf("the value of data_mutex_init is %ld\n", data_mutex_init);return 0;
}
IV 通過條件變量同步線程
1、條件變量
- 條件變量允許一個線程就某個共享資源的狀態變化通知其他線程
- 條件變量總是與互斥量配合使用,條件變量就共享變量的狀態改變發出通知,互斥變量則提供對共享變量的訪問保護,防止競爭條件
- 主要操作:
pthread_cond_wait()
:等待條件變量pthread_cond_signal()
:喚醒一個等待線程pthread_cond_broadcast()
:喚醒所有等待線程
2、靜態分配的條件變量
- 靜態初始化使用宏
PTHREAD_COND_INITIALIZER
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
3、初始化動態分配的條件變量
- 對于棧或堆中的條件變量需要動態初始化,使用完成后需要銷毀使用
#include <pthread.h>int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);int pthread_cond_destroy(pthread_cond_t *cond);/*return 0 on success or error number on failure*/
- 參數
attr
指定了條件變量的屬性,若為NULL則表示默認屬性
- 示例:
pthread_cond_t cond;pthread_cond_init(&cond, NULL);/*...*/pthread_cond_destroy(&cond);
4、通知和等待條件變量
- 條件變量的基本操作是發送信號和等待,發送信號即在共享變量狀態改變時通知處于等待狀態的線程,等待則是在收到共享變量狀態變化信號前一直處于阻塞狀態
#include <pthread.h>int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);int pthread_cond_signal(pthread_cond_t *cond);int pthread_cond_broadcast(pthread_cond_t *cond);/*return 0 on success or error number on failure*/
- 參數
cond
是指向條件變量的指針,參數mutex
是指向與條件變量配合使用的互斥量的指針
pthread_cond_signal()
只會喚醒一條處于等待狀態的線程,只有一條需要喚醒的線程時推薦使用
pthread_cond_broadcast()
會喚醒所有處于等待狀態的線程,有多條線程等待時推薦使用
pthread_cond_wait()
的執行過程:
??(1)解鎖互斥量
??(2)使調用線程進入等待狀態(阻塞)
??(3)當被其他線程通過signal/broadcast喚醒時,重新獲取互斥鎖
??(4)只有成功獲取互斥鎖后,函數才會返回
5、示例:生產者-消費者模型
- 多線程消費者-生產者模型,多位消費者與多位生產者,生產者生產一件商品后通知一個處于等待中的消費者,通過互斥量和條件變量實現線程同步
/* Mutex for producer-consumer synchronization */
pthread_mutex_t mtx_cp = PTHREAD_MUTEX_INITIALIZER;
/* Condition variable for producer-consumer synchronization */
pthread_cond_t con_cp = PTHREAD_COND_INITIALIZER;
/* Count of available products */
static int available = 0;
/* Count of consumed products */
static int consumed = 0;/* Producer thread function that creates products */
void *handler_productor(){sleep(1);/* Lock mutex before accessing shared data */int err = pthread_mutex_lock(&mtx_cp);if(err != 0){printf("mutex lock error: %s\n", strerror(err));return (void*)-1;}/* Increment available products count */available++;printf("one product produced, we will inform the consumer\n");/* Signal consumer that product is available */err = pthread_cond_signal(&con_cp);if(err != 0){printf("cont signal error: %s\n", strerror(err));pthread_mutex_unlock(&mtx_cp);return (void*)-1;}/* Unlock mutex after accessing shared data */err = pthread_mutex_unlock(&mtx_cp);if(err != 0){printf("mutex unlock error: %s\n", strerror(err));return (void*)-1;}return (void*)0;
}/* Consumer thread function that consumes products */
void *handler_consumer(){/* Lock mutex before checking shared data */int err = pthread_mutex_lock(&mtx_cp);if(err != 0){printf("mutex lock error:%s\n", strerror(err));return (void*)-1;}/* Wait while no products are available */while(available <= 0){err = pthread_cond_wait(&con_cp, &mtx_cp);if(err != 0){printf("cont wait error:%s\n", strerror(err));pthread_mutex_unlock(&mtx_cp);return (void*)-1;}}/* Consume product and update counters */available--;consumed++;printf("one product has benn consumed\n");/* Unlock mutex after accessing shared data */err = pthread_mutex_unlock(&mtx_cp);if(err != 0){printf("mutex unlock error:%s\n", strerror(err));return (void*)-1;}return (void*)0;
}/* Main function for producer-consumer demonstration */
int thread_producter_consumer(int argc, char *argv[]){/* Check command line arguments */if(argc < 2 || atoi(argv[1]) <= 0){printf("Usage:%s <consumer number>\n", argv[0]);return -1;}int err, tmp = atoi(argv[1]);/* Create producer and consumer threads */pthread_t pth_p[tmp],pth_c[tmp];for(int i = 0; i < tmp; i++){err = pthread_create(&pth_p[i], NULL, handler_productor, NULL);if(err != 0){printf("thread created error:%s\n", strerror(err));return -1;}err = pthread_create(&pth_c[i], NULL, handler_consumer, NULL);if(err != 0){printf("thread created error:%s\n", strerror(err));return -1;}}/* Wait for all threads to complete */for(int i = 0; i < tmp; i++){pthread_join(pth_p[i], NULL);pthread_join(pth_c[i], NULL);}/* Print total consumed products */printf("the consumer has totally consumed %d producteds\n", consumed);return 0;
}printf("thread created error:%s\n", strerror(err));return -1;}err = pthread_create(&pth_c[i], NULL, handler_consumer, NULL);if(err != 0){printf("thread created error:%s\n", strerror(err));return -1;}}/* Wait for all threads to complete */for(int i = 0; i < tmp; i++){pthread_join(pth_p[i], NULL);pthread_join(pth_c[i], NULL);}/* Print total consumed products */printf("the consumer has totally consumed %d producteds\n", consumed);return 0;
}