現在我們來討論第三種也是最后一種System V IPV工具:消息隊列。在許多方面看來,消息隊列類似于有名管道,但是卻沒有與打開與關閉管道的復雜關聯。然而,使用消息隊列并沒有解決我們使用有名管道所遇到的問題,例如管道上的阻塞。
?
消息隊列提供了一種在兩個不相關的進程之間傳遞數據的簡單高效的方法。與有名管道比較起來,消息隊列的優點在獨立于發送與接收進程,這減少了在打開與關閉有名管道之間同步的困難。
?
消息隊列提供了一種由一個進程向另一個進程發送塊數據的方法。另外,每一個數據塊被看作有一個類型,而接收進程可以獨立接收具有不同類型的數據塊。消息隊列的好處在于我們幾乎可以完全避免同步問題,并且可以通過發送消息屏蔽有名管道的問題。更好的是,我們可以使用某些緊急方式發送消息。壞處在于,與管道類似,在每一個數據塊上有一個最大尺寸限制,同時在系統中所有消息隊列上的塊尺寸上也有一個最大尺寸限制。
?
盡管有這些限制,但是X/Open規范并沒有定義這些限制的具體值,除了指出超過這些尺寸是某些消息隊列功能失敗的原因。Linux系統有兩個定義,MSGMAX與MSGMNB,這分別定義單個消息與一個隊列的最大尺寸。這些宏定義在其他系統上也許并不相同,甚至也許就不存在。
?
消息隊列函數定義如下:
?
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
int msgget(key_t key, int msgflg);
int msgrcv(int msqid, void *msg_ptr, size_t msg_sz, long int msgtype, int msgflg);
int msgsnd(int msqid, const void *msg_ptr, size_t msg_sz, int msgflg);
?
與信息號和共享內存一樣,頭文件sys/types.h與sys/ipc.h通常也是需要的。
?
msgget
?
我們可以使用msgget函數創建與訪問一個消息隊列:
?
int msgget(key_t key, int msgflg);
?
與其他IPC工具類似,程序必須提供一個指定一個特定消息隊列的key值。特殊值IPC_PRIVATE創建一個私有隊列,這在理論上只可以為當前進程所訪問。與信息量和共享內存一樣,在某些Linux系統上,消息隊列并不是私有的。因為私有隊列用處較少,因而這并不是一個嚴重問題。與前面一樣,第二個參數,msgflg,由9個權限標記組成。要創建一個新的消息隊列,由IPC_CREAT特殊位必須與其他的權限位進行或操作。設置IPC_CREAT標記與指定一個已存在的消息隊列并不是錯誤。如果消息隊列已經存在,IPC_CREAT標記只是簡單的被忽略。
?
如果成功,msgget函數會返回一個正數作為隊列標識符,如果失敗則會返回-1。
?
msgsnd
?
msgsnd函數允許我們將消息添加到消息隊列:
?
int msgsnd(int msqid, const void *msg_ptr, size_t msg_sz, int msgflg);
?
消息結構由兩種方式來限定。第一,他必須小于系統限制,第二,必須以long int開始,這在接收函數中會用作一個消息類型。當我們在使用消息時,最好是以如下形式來定義我們的消息結構:
?
struct my_message {
????long int message_type;
????/* The data you wish to transfer */
}
?
因為message_type用于消息接收,所以我們不能簡單的忽略他。我們必須定義我們自己的數據結構來包含并對其進行初始化,從而他可以包含一個可知的值。
?
第一個參數,msgid,是由msgget函數所返回的消息隊列標識符。
?
第二個參數,msg_ptr,是一個指向要發送消息的指針,正如前面所描述的,這個消息必須以long int類型開始。
?
第三個參數,msg_sz,是由msg_ptr所指向的消息的尺寸。這個尺寸必須不包含long int消息類型。
?
第四個參數,msgflg,控制如果當前消息隊列已滿或是達到了隊列消息的系統限制時如何處理。如果msgflg標記設置了IPC_NOWAIT,函數就會立即返回而不發送消息,并且返回值為-1。如果msgflg標記清除了IPC_NOWAIT標記,發送進程就會被掛起,等待隊列中有可用的空間。
?
如果成功,函數會返回0,如果失敗,則會返回-1。如果調用成功,系統就會復制一份消息數據并將其放入消息隊列中。
?
msgrcv
?
msgrcv函數由一個消息隊列中收取消息:
?
int msgrcv(int msqid, void *msg_ptr, size_t msg_sz, long int msgtype, int msgflg);
?
第一個參數,msqid,是由msgget函數所返回的消息隊列標記符。
?
第二個參數,msg_ptr,是一個指向將要接收消息的指針,正如在msgsnd函數中所描述的,這個消息必須以long int類型開始。
?
第三個參數,msg_sz,是由msg_ptr所指向的消息的尺寸,并不包含long int消息類型。
?
第四個參數,msgtype,是一個long int類型,允許一個接收優先級形式的實現。如果msgtype的值為0,隊列中第一個可用的消息就會被接收。如果其值大于0,具有相同消息類型的第一個消息就會被接收。如果其值小于0,第一個具有相同類型或是小于msgtype絕對值的消息就會被接收。
?
這聽起來要比實際操作復雜得多。如果我們只是簡單的希望以其發送的順序來接收消息,我們可以將msgtype設置為0。如果我們希望接收特殊消息類型的消息,我們可以將msgtype設置為等于這個值。如果我們希望接收消息類型為n或是小于n的值,我們可以將msgtype設置為-n。
?
第五個參數,msgflg,控制當沒有合適類型的消息正在等待被接收時如何處理。如果在msgflg中設置了IPC_NOWAIT位,調用就會立即返回,而返回值為-1。如果msgflg標記中消除了IPC_NOWAIT位,進程就會被掛起,等待一個合適類型的消息到來。
?
如果成功,msgrcv會返回放入接收緩沖區中的字節數,消息會被拷貝到由msg_ptr所指向的用戶分配緩沖區中,而數據就會由消息隊列中刪除。如果失敗則會返回-1。
?
msgctl
?
最后一個消息隊列函數是msgctl,這與共享內存中的控制函數十分類似。
?
int msgctl(int msqid, int command, struct msqid_ds *buf);
?
msqid_ds結構至少包含下列成員:
?
struct msqid_ds {
????uid_t msg_perm.uid;
????uid_t msg_perm.gid
????mode_t msg_perm.mode;
}
?
第一個參數,msqid,是由msgget函數所返回的標記符。
?
第二個參數,command,是要執行的動作。他可以取下面三個值:
?
命令 ???????描述
IPC_STAT ???設置msqid_ds結構中的數據來反射與消息隊列相關聯的值。
IPC_SET ???????如果進程有權限這樣做,這個命令會設置與msqid_ds數據結構中所提供的消息隊列相關聯的值。
IPC_RMID ???刪除消息隊列。
?
如果成功則會返回0,如果失敗則會返回-1。當進程正在msgsnd或是msgrcv函數中等待時如果消息隊列被刪除,發送或接收函數就會失敗。
?
試驗--消息隊列
?
現在我們已經了解了消息隊列的定義,我們可以來看一下他們是如何實際工作的。與前面一樣,我們將會編寫兩個程序:msg1.c來接收,msg2.c來發送。我們會允許任意一個程序創建消息隊列,但是使用接收者在接收到最后一條消息后刪除消息隊列。
?
1 下面是接收程序:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
?
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
?
struct my_msg_st
{
????long int my_msg_type;
????char some_text[BUFSIZ];
};
?
int main()
{
????int running = 1;
????int msgid;
????struct my_msg_st some_data;
????long int msg_to_receive = 0;
?
2 首先,我們設置消息隊列:
?
????msgid = msgget((key_t)1234,0666|IPC_CREAT);
?
????if(msgid == -1)
????{
????????fprintf(stderr,"msgget failed with error: %d\n", errno);
????????exit(EXIT_FAILURE);
????}
?
3 然后,接收消息隊列中的消息直到遇到一個end消息。最后,消息隊列被刪除:
?
????while(running)
????{
????????if(msgrcv(msgid, (void *)&some_data, BUFSIZ, msg_to_receive, 0) == -1)
????????{
????????????fprintf(stderr, "msgrcv failed with errno: %d\n", errno);
????????????exit(EXIT_FAILURE);
????????}
?
????????printf("You wrote: %s", some_data.some_text);
????????if(strncmp(some_data.some_text, "end", 3)==0)
????????{
????????????running = 0;
????????}
????}
?
????if(msgctl(msgid, IPC_RMID, 0)==-1)
????{
????????fprintf(stderr, "msgctl(IPC_RMID) failed\n");
????????exit(EXIT_FAILURE);
????}
?
????exit(EXIT_SUCCESS);
}
?
4 發送程序與msg1.c類似。在main函數中,刪除msg_to_receive聲明,代之以buffer[BUFSIZ]。移除消息隊列刪除代碼,并且在running循環中做出如下更改。現在我們調用msgsnd來將輸入的文本發送到隊列中。
?
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
?
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
?
#define MAX_TEXT 512
?
struct my_msg_st
{
????long int my_msg_type;
????char some_text[MAX_TEXT];
};
?
int main()
{
????int running = 1;
????struct my_msg_st some_data;
????int msgid;
????char buffer[BUFSIZ];
?
????msgid = msgget((key_t)1234, 0666|IPC_CREAT);
?
????if(msgid==-1)
????{
????????fprintf(stderr,"msgget failed with errno: %d\n", errno);
????????exit(EXIT_FAILURE);
????}
?
????while(running)
????{
????????printf("Enter some text: ");
????????fgets(buffer, BUFSIZ, stdin);
????????some_data.my_msg_type = 1;
????????strcpy(some_data.some_text, buffer);
?
????????if(msgsnd(msgid, (void *)&some_data, MAX_TEXT, 0)==-1)
????????{
????????????fprintf(stderr, "msgsnd failed\n");
????????????exit(EXIT_FAILURE);
????????}
?
????????if(strncmp(buffer, "end", 3) == 0)
????????{
????????????running = 0;
????????}
????}
?
????exit(EXIT_SUCCESS);
}
?
與管道中的例子不同,進程并沒有必要提供自己的同步機制。這是消息隊列比起管道的一個巨大優點。
?
假設消息隊列有空間,發送者可以創建隊列,在隊列中放入一些數據,并且甚至可以在接收者啟動之前退出。我們會首先運行發送者。如下面的例子輸出:
?
$ ./msg2
Enter some text: hello
Enter some text: How are you today?
Enter some text: end
$ ./msg1
You wrote: hello
You wrote: How are you today?
You wrote: end
$
?
工作原理
?
發送者程序使用msgget創建一個消息隊列;然后使用msgsnd函數向隊列中添加消息。接收者使用msgget來獲得消息隊列標識符,并且接收消息,直到接收到特殊消息end。然后他會使用msgctl刪除消息隊列進行一些清理工作。