文章目錄
- XSI
- semget
- semop、semtimedop
- semctl
- 基于共享內存demo修改
- XSI信號量的限制
- PV原語
- PV控制并發進程數
- POSIX信號量
- 使用posix命名信號量
- 使用posix匿名信號量
- 參考
在前兩篇文章中我們使用的racingdemo都沒有對臨界區代碼進行加鎖,這里我們介紹以下信號量的使用。
Linux環境下主要實現的信號量有兩種。根據標準的不同,它們跟共享內存類似,一套XSI的信號量,一套POSIX的信號量。下面我們分別使用它們實現一套類似文件鎖的方法,來簡單看看它們的使用。
XSI
XSI信號量就是內核實現的一個計數器,可以對計數器做甲減操作,并且操作時遵守一些基本操作原則,即:對計數器做加操作立即返回,做減操作要檢查計數器當前值是否夠減?(減被減數之后是否小于0)如果夠,則減操作不會被阻塞;如果不夠,則阻塞等待到夠減為止。
調用API如下:
semget
#include <sys/sem.h>int semget(key_t key, int nsems, int semflg);
可以使用semget創建或者打開一個已經創建的信號量數組。
key用來標識系統內的信號量。這里除了可以使用ftok產生以外,還可以使用IPC_PRIVATE創建一個沒有key的信號量。
如果指定的key已經存在,則意味著打開這個信號量,這時nsems參數指定為0,semflg參數也指定為0。
nsems參數表示在創建信號量數組的時候,這個數組中的信號量個數是幾個。
semflg參數用來指定標志位,主要有:IPC_CREAT,IPC_EXCL和權限mode。
semop、semtimedop
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>int semop(int semid, struct sembuf *sops, size_t nsops);int semtimedop(int semid, struct sembuf *sops, size_t nsops, const struct timespec *timeout);
使用semop調用來對信號量數組進行操作。nsops指定對數組中的幾個元素進行操作,如數組中只有一個信號量就指定為1。操作的所有參數都定義在一個sembuf結構體里:
unsigned short sem_num; /* semaphore number */
short sem_op; /* semaphore operation */
short sem_flg; /* operation flags */
sem_flg
可以指定的參數包括IPC_NOWAIT
和SEM_UNDO
當制定了SEM_UNDO
,進程退出的時候會自動UNDO
它對信號量的操作。
對信號量的操作會作用在指定的第sem_num個信號量。一個信號量集合中的第1個信號量的編號從0開始。所以,對于只有一個信號量的信號集,這個sem_num應指定為0。
sem_op用來指定對信號量的操作,可以有的操作有三種:
- 正值操作:對信號量計數器的值(semval)進行加操作。
- 0值操作:對計數器的值沒有影響,而且要求對進程對信號量必須有讀權限。實際上這個行為是一個“等待計數器為0”的操作:
- 如果計數器的值為0,則操作可以立即返回。如果不是0并且sem_flg被設置為IPC_NOWAIT的情況下,0值操作也不會阻塞,而是會立即返回,并且errno被設置為EAGAIN。
- 如果不是0,且沒設置IPC_NOWAIT時,操作會阻塞,直到計數器值變成0為止,此時相關信號量的semncnt值會加1,這個值用來記錄有多少個進程(線程)在此信號量上等待。
- 除了計數器變為0會導致阻塞停止以外,還有其他情況也會導致停止等待:信號量被刪除,semop操作會失敗,并且errno被置為EIDRM。進程被信號(signal)打斷,errno會被置為EINTR,切semzcnt會被正常做減處理。
- 負值操作:對計數器做減操作,且進程對信號量必須有寫權限。
- 如果當前計數器的值大于或等于指定負值的絕對值,則semop可以立即返回,并且計數器的值會被置為減操作的結果。
- 如果sem_op的絕對值大于計數器的值semval,則說明目前不夠減。如果sem_flg設置了IPC_NOWAIT,semop操作依然會立即返回并且errno被置為EAGAIN。如果沒設置IPC_NOWAIT,則會阻塞,直到以下幾種情況發生為止:
- semval的值大于或等于sem_op的絕對值,這時表示有足夠的值做減法了。
- 信號量被刪除,semop返回EIDRM。
- 進程(線程)被信號打斷,semop返回EINTR。
semtimedop
提供了一個帶超時機制的結構,以便實現等待超時。
semctl
觀察semop的行為我們會發現,有必要在一個信號量創建之后對其默認的計數器semval進行賦值。所以,我們需要在semop之前,使用semctl進行賦值操作。
int semctl(int semid, int semnum, int cmd, ...);
這個調用是一個可變參實現,具體參數要根據cmd的不同而變化。在一般的使用中,我們主要要學會使用它改變semval的值和查看、修改sem的屬性。相關的cmd為:SETVAL
、IPC_RMID
、IPC_STAT
。
修改:
semctl(semid, 0, SETVAL, 1);
這個調用可以將指定的sem的semval值設置為1。更具體的參數解釋大家可以參考man 2 semctl。
基于共享內存demo修改
參考:https://blog.csdn.net/qq_42604176/article/details/123449737?spm=1001.2014.3001.5501的XSI示例
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <sys/file.h>
#include <wait.h>
#include <sys/mman.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <sys/sem.h>#define COUNT 100
#define PATHNAME "/etc/passwd"static int lockid;// 初始化信號量
int mylock_init(void)
{int semid;semid = semget(IPC_PRIVATE, 1, IPC_CREAT|0600);if (semid < 0) {perror("semget()");return -1;}if (semctl(semid, 0, SETVAL, 1) < 0) {perror("semctl()");return -1;}return semid;
}
// 銷毀信號量
void mylock_destroy(int lockid)
{semctl(lockid, 0, IPC_RMID);
}// 信號量值--,表示鎖住
int mylock(int lockid)
{struct sembuf sbuf;sbuf.sem_num = 0;sbuf.sem_op = -1;sbuf.sem_flg = 0;// 為什么是while循環?// 防止進程(線程)被信號打斷,semop返回EINTRwhile (semop(lockid, &sbuf, 1) < 0) {if (errno == EINTR) {continue;}perror("semop()");return -1;}return 0;
}// 信號量++, 表示解鎖
int myunlock(int lockid)
{struct sembuf sbuf;sbuf.sem_num = 0;sbuf.sem_op = 1;sbuf.sem_flg = 0;if (semop(lockid, &sbuf, 1) < 0) {perror("semop()");return -1;}return 0;
}
// 參考https://blog.csdn.net/qq_42604176/article/details/123449737?spm=1001.2014.3001.5501 的XSI的do_child 看看兩者有何不同呢
int do_child(int proj_id)
{int interval;int *shm_p, shm_id;key_t shm_key;/* 使用ftok產生shmkey */if ((shm_key = ftok(PATHNAME, proj_id)) == -1) {perror("ftok()");exit(1);}/* 在子進程中使用shmget取到已經在父進程中創建好的共享內存id,注意shmget的第三個參數的使用。 */shm_id = shmget(shm_key, sizeof(int), 0);if (shm_id < 0) {perror("shmget()");exit(1);}/* 使用shmat將相關共享內存段映射到本進程的內存地址。 */shm_p = (int *)shmat(shm_id, NULL, 0);if ((void *)shm_p == (void *)-1) {perror("shmat()");exit(1);}/* critical section */// 對于臨界區代碼進行加鎖解鎖if (mylock(lockid) == -1) {exit(1);}interval = *shm_p;interval++;usleep(1);*shm_p = interval;if (myunlock(lockid) == -1) {exit(1);}/* critical section *//* 使用shmdt解除本進程內對共享內存的地址映射,本操作不會刪除共享內存。 */if (shmdt(shm_p) < 0) {perror("shmdt()");exit(1);}exit(0);
}int main()
{pid_t pid;int count;int *shm_p;int shm_id, proj_id;key_t shm_key;// 初始化信號量lockid = mylock_init();if (lockid == -1) {exit(1);}proj_id = 1234;/* 使用約定好的文件路徑和proj_id產生shm_key。 */if ((shm_key = ftok(PATHNAME, proj_id)) == -1) {perror("ftok()");exit(1);}/* 使用shm_key創建一個共享內存,如果系統中已經存在此共享內存則報錯退出,創建出來的共享內存權限為0600。 */shm_id = shmget(shm_key, sizeof(int), IPC_CREAT|IPC_EXCL|0600);if (shm_id < 0) {perror("shmget()");exit(1);}/* 將創建好的共享內存映射進父進程的地址以便訪問。 */shm_p = (int *)shmat(shm_id, NULL, 0);if ((void *)shm_p == (void *)-1) {perror("shmat()");exit(1);}/* 共享內存賦值為0。 */*shm_p = 0;/* 打開100個子進程并發讀寫共享內存。 */for (count=0;count<COUNT;count++) {pid = fork();if (pid < 0) {perror("fork()");exit(1);}if (pid == 0) {do_child(proj_id);}}/* 等待所有子進程執行完畢。 */for (count=0;count<COUNT;count++) {wait(NULL);}/* 顯示當前共享內存的值。 */printf("shm_p: %d\n", *shm_p);/* 解除共享內存地質映射。 */if (shmdt(shm_p) < 0) {perror("shmdt()");exit(1);}/* 刪除共享內存。 */if (shmctl(shm_id, IPC_RMID, NULL) < 0) {perror("shmctl()");exit(1);}// 銷毀信號量mylock_destroy(lockid);exit(0);
}
編譯運行結果:
[root@VM-90-225-centos /home/hanhan/SocketTest/LocalSocketDemo]# g++ ./racing_posix_shm.cpp -lrt -o racing_posix_shm
[root@VM-90-225-centos /home/hanhan/SocketTest/LocalSocketDemo]# ./racing_posix_shm
shm_p: 100
XSI信號量的限制
系統中對于XSI信號量的限制都放在一個文件中,路徑為:/proc/sys/kernel/sem。文件中包涵4個限制值,它們分別的含義是:
[root@VM-90-225-centos /]# cat /proc/sys/kernel/sem
32000 1024000000 500 32000
SEMMSL:一個信號量集(semaphore set)中,最多可以有多少個信號量。這個限制實際上就是semget調用的第二個參數的個數上限。
SEMMNS:系統中在所有信號量集中最多可以有多少個信號量。
SEMOPM:可以使用semop系統調用指定的操作數限制。這個實際上是semop調用中,第二個參數的結構體中的sem_op的數字上限。
SEMMNI:系統中信號量的id標示數限制。就是信號量集的個數上限。
PV原語
PV操作是操作系統原理中的重點內容之一,而根據上述的互斥鎖功能的描述來看,實際上我們的互斥鎖就是一個典型的PV操作。加鎖行為就是P操作,解鎖就是V操作。PV操作是計算機操作系統需要提供的基本功能之一。我們都知道現在的計算機基本都是多核甚至多CPU的場景,所以很多計算任務如果可以并發執行,那么無疑可以增加計算能力。假設我們使用多進程的方式進行并發運算,那么并發多少個進程合適呢?雖然說這個問題會根據不同的應用場景發生變化,但是如果假定是一個極度消耗CPU的運算的話,那么無疑有幾個CPU就應該并發幾個進程。此時并發個數如果過多,則會增加調度開銷導致整體吞度量下降,而過少則無法利用多個CPU核心。
下面我們將用PV操作源于控制同時進行運算的進程個數。對于互斥鎖來說,計數器的初值為1,而對于這個PV操作,計數器的初值設置為當前計算機的核心個數。應用采用并發的方式找到10010001到10020000數字范圍內質數,并控制并發的進程數為計算機核心數。
整個進程組的執行邏輯可以描述為,父進程需要運算判斷10010001到10020000數字范圍內所有出現的質數,采用每算一個數打開一個子進程的方式。為控制同時進行運算的子進程個數不超過CPU個數,所以申請了一個值為CPU個數的信號量計數器,每創建一個子進程,就對計數器做P操作,子進程運算完推出對計數器做V操作。由于P操作在計數器是0的情況下會阻塞,直到有其他子進程退出時使用V操作使計數器加1,所以整個進程組不會產生大于CPU個數的子進程進行任務的運算。
PV控制并發進程數
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <signal.h>#define START 10010001
#define END 10020000
#define NPROC 4static int pv_id;int mysem_init(int n)
{int semid;semid = semget(IPC_PRIVATE, 1, IPC_CREAT|0600);if (semid < 0) {perror("semget()");return -1;}if (semctl(semid, 0, SETVAL, n) < 0) {perror("semctl()");return -1;}return semid;
}void mysem_destroy(int pv_id)
{semctl(pv_id, 0, IPC_RMID);
}int P(int pv_id)
{struct sembuf sbuf;sbuf.sem_num = 0;sbuf.sem_op = -1;sbuf.sem_flg = 0;while (semop(pv_id, &sbuf, 1) < 0) {if (errno == EINTR) {continue;}perror("semop(p)");return -1;}return 0;
}int V(int pv_id)
{struct sembuf sbuf;sbuf.sem_num = 0;sbuf.sem_op = 1;sbuf.sem_flg = 0;if (semop(pv_id, &sbuf, 1) < 0) {perror("semop(v)");return -1;}return 0;
}int prime_proc(int n)
{int i, j, flag;flag = 1;for (i=2;i<n/2;++i) {if (n%i == 0) {flag = 0;break;}}if (flag == 1) {printf("%d is a prime\n", n);}/* 子進程判斷完當前數字退出之前進行V操作 */V(pv_id);exit(0);
}void sig_child(int sig_num)
{while (waitpid(-1, NULL, WNOHANG) > 0);
}int main(void)
{pid_t pid;int i;/* 當子進程退出的時候使用信號處理進行回收,以防止產生很多僵尸進程 */if (signal(SIGCHLD, sig_child) == SIG_ERR) {perror("signal()");exit(1);}pv_id = mysem_init(NPROC);/* 每個需要運算的數字都打開一個子進程進行判斷 */for (i=START;i<END;i+=2) {/* 創建子進程的時候進行P操作。 */P(pv_id);pid = fork();if (pid < 0) {/* 如果創建失敗則應該V操作 */V(pv_id);perror("fork()");exit(1);}if (pid == 0) {/* 創建子進程進行這個數字的判斷 */prime_proc(i);}}/* 在此等待所有數都運算完,以防止運算到最后父進程先mysem_destroy,導致最后四個子進程進行V操作時報錯 */while (1) {sleep(1);};mysem_destroy(pv_id);exit(0);
}
這段代碼使用了信號處理的方式回收子進程,以防產生過多的僵尸進程。使用這個方法引出的問題在于,如果父進程不在退出前等所有子進程回收完畢,那么父進程將在最后幾個子進程執行完之前就將信號量刪除了,導致最后幾個子進程進行V操作的時候會報錯。
POSIX信號量
POSIX提供了一套新的信號量原語
#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>sem_t *sem_open(const char *name, int oflag);
sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
使用sem_open來創建或訪問一個已經創建的POSIX信號量。創建時,可以使用value參數對其直接賦值。
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
sem_wait會對指定信號量進行減操作,如果信號量原值大于0,則減操作立即返回。如果當前值為0,則sem_wait會阻塞,直到能減為止。
int sem_post(sem_t *sem);
sem_post用來對信號量做加操作。這會導致某個已經使用sem_wait等在這個信號量上的進程返回。
int sem_getvalue(sem_t *sem, int *sval);
sem_getvalue用來返回當前信號量的值到sval指向的內存地址中。如果當前有進程使用sem_wait等待此信號量,POSIX可以允許有兩種返回,一種是返回0,另一種是返回一個負值,這個負值的絕對值就是等待進程的個數。Linux默認的實現是返回0。
int sem_unlink(const char *name);int sem_close(sem_t *sem);
使用sem_close可以在進程內部關閉一個信號量,sem_unlink可以在系統中刪除信號量。
POSIX信號量實現的更清晰簡潔,相比之下,XSI信號量更加復雜,但是卻更佳靈活,應用場景更加廣泛。在XSI信號量中,對計數器的加和減操作都是通過semop方法和一個sembuff的結構體來實現的,但是在POSIX中則給出了更清晰的定義:使用sem_post函數可以增加信號量計數器的值,使用sem_wait可以減少計數器的值。如果計數器的值當前是0,則sem_wait操作會阻塞到值大于0。
POSIX信號量也提供了兩種方式的實現,命名信號量和匿名信號量。這有點類似XSI方式使用ftok文件路徑創建和IPC_PRIVATE方式創建的區別。但是表現形式不太一樣:
命名信號量:
命名信號量實際上就是有一個文件名的信號量。跟POSIX共享內存類似,信號量也會在/dev/shm目錄下創建一個文件,如果有這個文件名就是一個命名信號量。其它進程可以通過這個文件名來通過sem_open方法使用這個信號量。除了訪問一個命名信號量以外,sem_open方法還可以創建一個信號量。創建之后,就可以使用sem_wait、sem_post等方法進行操作了。這里要注意的是,一個命名信號量在用sem_close關閉之后,還要使用sem_unlink刪除其文件名,才算徹底被刪除。
匿名信號量:
一個匿名信號量僅僅就是一段內存區,并沒有一個文件名與之對應。匿名信號量使用sem_init進行初始化,使用sem_destroy()銷毀。操作方法跟命名信號量一樣。匿名內存的初始化方法跟sem_open不一樣,sem_init要求對一段已有內存進行初始化,而不是在/dev/shm下產生一個文件。這就要求:如果信號量是在一個進程中的多個線程中使用,那么它所在的內存區應該是這些線程應該都能訪問到的全局變量或者malloc分配到的內存。如果是在多個進程間共享,那么這段內存應該本身是一段共享內存(使用mmap、shmget或shm_open申請的內存)
使用posix命名信號量
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <sys/file.h>
#include <wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <semaphore.h>#define COUNT 100
#define SHMPATH "/shm"
#define SEMPATH "/sem"static sem_t *sem;sem_t *mylock_init(void)
{sem_t * ret;ret = sem_open(SEMPATH, O_CREAT|O_EXCL, 0600, 1);if (ret == SEM_FAILED) {perror("sem_open()");return NULL;}return ret;
}void mylock_destroy(sem_t *sem)
{sem_close(sem);sem_unlink(SEMPATH);
}int mylock(sem_t *sem)
{while (sem_wait(sem) < 0) {if (errno == EINTR) {continue;}perror("sem_wait()");return -1;}return 0;
}int myunlock(sem_t *sem)
{if (sem_post(sem) < 0) {perror("semop()");return -1;}
}int do_child(char * shmpath)
{int interval, shmfd, ret;int *shm_p;shmfd = shm_open(shmpath, O_RDWR, 0600);if (shmfd < 0) {perror("shm_open()");exit(1);}shm_p = (int *)mmap(NULL, sizeof(int), PROT_WRITE|PROT_READ, MAP_SHARED, shmfd, 0);if (MAP_FAILED == shm_p) {perror("mmap()");exit(1);}/* critical section */mylock(sem);interval = *shm_p;interval++;usleep(1);*shm_p = interval;myunlock(sem);/* critical section */munmap(shm_p, sizeof(int));close(shmfd);exit(0);
}int main()
{pid_t pid;int count, shmfd, ret;int *shm_p;sem = mylock_init();if (sem == NULL) {fprintf(stderr, "mylock_init(): error!\n");exit(1);}shmfd = shm_open(SHMPATH, O_RDWR|O_CREAT|O_TRUNC, 0600);if (shmfd < 0) {perror("shm_open()");exit(1);}ret = ftruncate(shmfd, sizeof(int));if (ret < 0) {perror("ftruncate()");exit(1);}shm_p = (int *)mmap(NULL, sizeof(int), PROT_WRITE|PROT_READ, MAP_SHARED, shmfd, 0);if (MAP_FAILED == shm_p) {perror("mmap()");exit(1);}*shm_p = 0;for (count=0;count<COUNT;count++) {pid = fork();if (pid < 0) {perror("fork()");exit(1);}if (pid == 0) {do_child(SHMPATH);}}for (count=0;count<COUNT;count++) {wait(NULL);}printf("shm_p: %d\n", *shm_p);munmap(shm_p, sizeof(int));close(shmfd);shm_unlink(SHMPATH);sleep(3000);mylock_destroy(sem);exit(0);
}
編譯:
g++ demo.cpp -lrt -lpthread -o demo
使用posix匿名信號量
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <sys/file.h>
#include <wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <semaphore.h>#define COUNT 100
#define SHMPATH "/shm"static sem_t *sem;void mylock_init(void)
{sem_init(sem, 1, 1);
}void mylock_destroy(sem_t *sem)
{sem_destroy(sem);
}int mylock(sem_t *sem)
{while (sem_wait(sem) < 0) {if (errno == EINTR) {continue;}perror("sem_wait()");return -1;}return 0;
}int myunlock(sem_t *sem)
{if (sem_post(sem) < 0) {perror("semop()");return -1;}
}int do_child(char * shmpath)
{int interval, shmfd, ret;int *shm_p;shmfd = shm_open(shmpath, O_RDWR, 0600);if (shmfd < 0) {perror("shm_open()");exit(1);}shm_p = (int *)mmap(NULL, sizeof(int), PROT_WRITE|PROT_READ, MAP_SHARED, shmfd, 0);if (MAP_FAILED == shm_p) {perror("mmap()");exit(1);}/* critical section */mylock(sem);interval = *shm_p;interval++;usleep(1);*shm_p = interval;myunlock(sem);/* critical section */munmap(shm_p, sizeof(int));close(shmfd);exit(0);
}int main()
{pid_t pid;int count, shmfd, ret;int *shm_p;sem = (sem_t *)mmap(NULL, sizeof(sem_t), PROT_WRITE|PROT_READ, MAP_SHARED|MAP_ANONYMOUS, -1, 0);if ((void *)sem == MAP_FAILED) {perror("mmap()");exit(1);}mylock_init();shmfd = shm_open(SHMPATH, O_RDWR|O_CREAT|O_TRUNC, 0600);if (shmfd < 0) {perror("shm_open()");exit(1);}ret = ftruncate(shmfd, sizeof(int));if (ret < 0) {perror("ftruncate()");exit(1);}shm_p = (int *)mmap(NULL, sizeof(int), PROT_WRITE|PROT_READ, MAP_SHARED, shmfd, 0);if (MAP_FAILED == shm_p) {perror("mmap()");exit(1);}*shm_p = 0;for (count=0;count<COUNT;count++) {pid = fork();if (pid < 0) {perror("fork()");exit(1);}if (pid == 0) {do_child(SHMPATH);}}for (count=0;count<COUNT;count++) {wait(NULL);}printf("shm_p: %d\n", *shm_p);munmap(shm_p, sizeof(int));close(shmfd);shm_unlink(SHMPATH);sleep(3000);mylock_destroy(sem);exit(0);
}
參考
https://zorrozou.github.io/docs/books/linuxde-jin-cheng-jian-tong-4fe1-xin-hao-liang.html