本文介紹消息隊列,它允許進程之間以消息的形式交換數據。數據的交換單位是整個消息。
- POSIX 消息隊列是引用計數的。只有當所有當前使用隊列的進程都關閉了隊列之后才會對隊列進行標記以便刪除。
- POSIX 消息有一個關聯的優先級,并且消息之間是嚴格按照優先級順序排隊的(以及接收)。
- POSIX 消息隊列提供了一個特性允許在隊列中的一條消息可用時異步地通知進程。
POSIX 消息隊列支持是一個通過 CONFIG_POSIX_MQUEUE 選項配置的可選內核組件。
1 概述
POSIX 消息隊列 API 中的主要函數如下。
- mq_open()函數創建一個新消息隊列或打開一個既有隊列,返回后續調用中會用到的消息隊列描述符。
- mq_send()函數向隊列寫入一條消息。
- mq_receive()函數從隊列中讀取一條消息。
- mq_close()函數關閉進程之前打開的一個消息隊列。
- mq_unlink()函數刪除一個消息隊列名并當所有進程關閉該隊列時對隊列進行標記以便刪除。
上面的函數所完成的功能是相當明顯的。此外,POSIX 消息隊列 API 還具備一些特別的特性。
- 每個消息隊列都有一組關聯的特性,其中一些特性可以在使用 mq_open()創建或打開隊列時進行設置。獲取和修改隊列特性的工作則是由兩個函數來完成的:mq_getattr()和 mq_setattr()。
- mq_notify()函數允許一個進程向一個隊列注冊接收消息通知。在注冊完之后,當一條消息可用時會通過發送一個信號或在一個單獨的線程中調用一個函數來通知進程。
2 打開、關閉和斷開鏈接消息隊列
2.1 打開一個消息隊列
mq_open()函數創建一個新消息隊列或打開一個既有隊列。
#include <fcntl.h> /*Defines 0*constants */
#include<sys/stat.h> /*Defines mode constants */
#include <mqueue.h>mqd_t mq_open(const char *name, int oflag, .../* mode t mode, struct mq_attr *attr */);/* Returns a message queue descriptor on success, or (mgd_t)-l on error */
name 參數標識出了消息隊列。
oflag 參數是一個位掩碼,它控制著 mq_open()操作的各個方面。下表對這個掩碼中可以包含的值進行了總結。
oflag 參數的其中一個用途是,確定是打開一個既有隊列還是創建和打開一個新隊列。如果在 oflag 中不包含 O_CREAT,那么將會打開一個既有隊列。如果在 oflag 中包含了 O_CREAT,并且與給定的 name 對應的隊列不存在,那么就會創建一個新的空隊列。如果在 oflag 中同時包含 O_CREAT 和 O_EXCL,并且與給定的 name 對應的隊列已經存在,那么 mq_open()就會失敗。
oflag 參數還能夠通過包含 O_RDONLY、O_WRONLY 以及 O_RDWR 這三個值中的一個來表明調用進程在消息隊列上的訪問方式。
剩下的一個標記值 O_NONBLOCK 將會導致以非阻塞的模式打開隊列。如果后續的mq_receive()或 mq_send()調用無法在不阻塞的情況下執行,那么調用就會立即返回 EAGAIN 錯誤。
mq_open()通常用來打開一個既有消息隊列,這種調用只需要兩個參數,但如果在 flags中指定了 O_CREAT,那么就還需要另外兩個參數:mode 和 attr。(如果通過 name 指定的隊列已經存在,那么這兩個參數會被忽略。)這些參數的用法如下。
- mode 參數是一個位掩碼,它指定了施加于新消息隊列之上的權限。這個參數可取的位值與文件上的掩碼值是一樣的,并且與 open()一樣,mode 中的值會與進程的 umask取掩碼。要從一個隊列中讀取消息(mq_receive())就必須要將讀權限賦予相應的用戶,要向隊列寫入消息(mq_send())就需要寫權限。
- attr 參數是一個 mq_attr 結構,它指定了新消息隊列的特性。如果 attr 為 NULL,那么將使用實現定義的默認特性創建隊列。
mq_open()在成功結束時會返回一個消息隊列描述符,它是一個類型為 mqd_t 的值,在后續的調用中將會使用它來引用這個打開著的消息隊列。即需要確保這個類型是一個能在賦值語句中使用或能作為函數參數傳遞的的類型。(如在 Linux 上,mqd_t 是一個 int)
2.2 fork()、exec()以及進程終止對消息隊列描述符的影響
在 fork()中子進程會接收其父進程的消息隊列描述符的副本,并且這些描述符會引用同樣的打開著的消息隊列描述。子進程不會繼承其父進程的任何消息通知注冊。
當一個進程執行了一個 exec()或終止時,所有其打開的消息隊列描述符會被關閉。關閉消息隊列描述符的結果是進程在相應隊列上的消息通知注冊會被注銷。
2.3 關閉一個消息隊列
mq_close()函數關閉消息隊列描述符 mqdes。
#include <mqueue.h>
int mq_close(mqd_t mqdes);/* Returns 0 on success, or -l on error */
如果調用進程已經通過 mqdes 在隊列上注冊了消息通知,那么通知注冊會自動被刪除,并且另一個進程可以隨后向該隊列注冊消息通知。
當進程終止或調用 exec()時,消息隊列描述符會被自動關閉。與文件描述符一樣,應用程序應該在不再使用消息隊列描述符的時候顯式地關閉消息隊列描述符以防止出現進程耗盡消息隊列描述符的情況。
與文件上的 close()一樣,關閉一個消息隊列并不會刪除該隊列。要刪除隊列則需要使用mq_unlink(),它是 unlink()在消息隊列上的版本。
2.4 刪除一個消息隊列
mq_unlink()函數刪除通過 name 標識的消息隊列,并將隊列標記為在所有進程使用完該隊列之后銷毀該隊列(這可能意味著會立即刪除,前提是所有打開該隊列的進程已經關閉了該隊列)。
#include <mqueue.h>
int mq_unlink(const char *name);/* Returns 0 on success, or -l on error */
示例程序顯示了 mq_unlink()的用法。
/* pmsg_unlink.cUsage: pmsg_unlink mq-nameUnlink a POSIX message queue.使用 mq_unlink()斷開一個 POSIX 消息隊列的鏈接Linux supports POSIX message queues since kernel 2.6.6.
*/
#include <mqueue.h>
#include "tlpi_hdr.h"int
main(int argc, char *argv[])
{if (argc != 2 || strcmp(argv[1], "--help") == 0)usageErr("%s mq-name\n", argv[0]);if (mq_unlink(argv[1]) == -1)errExit("mq_unlink");exit(EXIT_SUCCESS);
}
3 描述符和消息隊列之間的關系
消息隊列描述符和打開著的消息隊列之間的關系 與 文件描述符和打開著的文件描述之間的關系類似。消息隊列描述符是一個進程級別的句柄,它引用了系統層面的打開著的消息隊列描述表中的一個條目,而該條目則引用了一個消息隊列對象。下圖對這種關系進行了描繪。
在 Linux 上,消息隊列被實現成了虛擬文件系統中的 i-node,并且消息隊列描述符和打開著的消息隊列描述分別被實現成了文件描述符和打開著的文件描述。
下圖有助于闡明消息隊列描述符的使用方面的細節問題(所有這些都與文件描述符的使用類似)。
- 一個打開的消息隊列描述擁有一組關聯的標記。SUSv3 只規定了一種這樣的標記,即NONBLOCK,它確定了 I/O 是否是非阻塞的。
- 兩個進程能夠持有引用同一個打開的消息隊列描述的消息隊列描述符(圖中的描述符x)。當一個進程在打開了一個消息隊列之后調用 fork()時就會發生這種情況。這些描述符會共享 O_NONBLOCK 標記的狀態。
- 兩個進程能夠持有引用不同消息隊列描述(它們引用了同一個消息隊列)的打開的消息隊列描述(如進程 A 中的描述符 z 和進程 B 中的描述符 y 都引用了/mq-r)。當兩個進程分別使用 mq_open()打開同一個隊列時就會發生這種情況。
4 消息隊列特性
mq_open()、mq_getattr()以及 mq_setattr()函數都會接收一個參數,它是一個指向 mq_attr結構的指針。這個結構是在<mqueue.h>中進行定義的,其形式如下。
struct mq_attr {long mq_flags; /* Message queue description flags:0 or O_NONBLOCK [mq_getattr(),mq_setattr()] */long mq_maxmsg; /* Maximum number of messages on queue [mq_open(),mq_getattr()] */long mq_msgsize; /* Maximum message size(in bytes) [mq_open(),mq_getattr()] */long mq_curmsgs; /* Number of messages currently in queue [mq_getattr()] */
};
在開始深入介紹 mq_attr 的細節之前有必要指出以下幾點。
- 這三個函數中的每個函數都只用到了其中幾個字段。上面給出的結構定義中的注釋指出了各個函數所用到的字段。
- 這個結構包含了與一個消息描述符相關聯的打開的消息隊列描述(mq_flags)的相關信息以及該描述符所引用的隊列的相關信息(mq_maxmsg、mq_msgsize、mq_curmsgs)。
- 其中一些字段中包含的信息在使用 mq_open()創建隊列時就已經確定下來了(mq_maxmsg 和 mq_msgsize);其他字段則會返回消息隊列描述(mq_flags)或消息隊列(mq_curmsgs)的當前狀態的相關信息。
4.1 在創建隊列時設置消息隊列特性
在使用 mq_open()創建消息隊列時可以通過下列 mq_attr 字段來確定隊列的特性。
- mq_maxmsg 字段定義了使用mq_send()向消息隊列添加消息的數量上限,其取值必須大于0。
- mq_msgsize 字段定義了加入消息隊列的每條消息的大小的上限,其取值必須大于 0。
內核根據這兩個值來確定消息隊列所需的最大內存量。
mq_maxmsg 和 mq_msgsize 特性是在消息隊列被創建時就確定下來的,并且之后也無法修改這兩個特性。
示例程序為 mq_open()函數提供了一個命令行界面并展示了在 mq_open()中如何使用 mq_attr 結構。
消息隊列特性可以通過兩個命令行參數來指定:–m 用于指定 mq_maxmsg,–s 用于指定mq_msgsize。
- 只要指定了其中一個選項,那么一個非 NULL 的 attrp 參數就會被傳遞給mq_open()。
- 如果在命令行中只指定了–m 和–s 選項中的一個,那么 attrp 指向的 mq_attr 結構中的一些字段就會取默認值。
- 如果兩個選項都被沒有被指定,那么在調用 mq_open()時會將attrp 指定為 NULL,這將會導致使用由實現定義的隊列特性的默認值來創建隊列。
/* pmsg_create.cCreate a POSIX message queue.創建一個 POSIX 消息隊列Usage as shown in usageError().Linux supports POSIX message queues since kernel 2.6.6.
*/
#include <mqueue.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "tlpi_hdr.h"static void
usageError(const char *progName)
{fprintf(stderr, "Usage: %s [-cx] [-m maxmsg] [-s msgsize] mq-name ""[octal-perms]\n", progName);fprintf(stderr, " -c Create queue (O_CREAT)\n");fprintf(stderr, " -m maxmsg Set maximum # of messages\n");fprintf(stderr, " -s msgsize Set maximum message size\n");fprintf(stderr, " -x Create exclusively (O_EXCL)\n");exit(EXIT_FAILURE);
}int
main(int argc, char *argv[])
{int flags, opt;mode_t perms;mqd_t mqd;struct mq_attr attr, *attrp;/* If 'attrp' is NULL, mq_open() uses default attributes. If anoption specifying a message queue attribute is supplied on thecommand line, we save the attribute in 'attr' and set 'attrp'pointing to 'attr'. We assign some (arbitrary) default valuesto the fields of 'attr' in case the user specifies the valuefor one of the queue attributes, but not the other. */attrp = NULL;attr.mq_maxmsg = 10;attr.mq_msgsize = 2048;flags = O_RDWR;/* Parse command-line options */while ((opt = getopt(argc, argv, "cm:s:x")) != -1) {switch (opt) {case 'c':flags |= O_CREAT;break;case 'm':attr.mq_maxmsg = atoi(optarg);attrp = &attr;break;case 's':attr.mq_msgsize = atoi(optarg);attrp = &attr;break;case 'x':flags |= O_EXCL;break;default:usageError(argv[0]);}}if (optind >= argc)usageError(argv[0]);perms = (argc <= optind + 1) ? (S_IRUSR | S_IWUSR) :getInt(argv[optind + 1], GN_BASE_8, "octal-perms");mqd = mq_open(argv[optind], flags, perms, attrp);if (mqd == (mqd_t) -1)errExit("mq_open");exit(EXIT_SUCCESS);
}
4.2 獲取消息隊列特性
mq_getattr()函數返回一個包含與描述符 mqdes 相關聯的消息隊列描述和消息隊列的相關信息的 mq_attr 結構。
#include <mqueue.h>
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);/* Returs 0 on success, or -l on error */
除了上面已經介紹的 mq_maxmsg 和 mq_msgsize 字段之外,attr 指向的返回結構中還包含下列字段。
- mq_flags
這些是與描述符 mqdes 相關聯的打開的消息隊列描述的標記,其取值只有一個:O_NONBLOCK。這個標記是根據 mq_open()的 oflag 參數來初始化的,并且使用 mq_setattr()可以修改這個標記。 - mq_curmsgs
當前位于隊列中的消息數。這個信息在 mq_getattr()返回時可能已經發生了改變,前提是存在其他進程從隊列中讀取消息或向隊列寫入消息。
示例程序使用了 mq_getattr()來獲取通過命令行參數指定的消息隊列的特性,然后在標準輸出中顯示這些特性。
/* pmsg_getattr.cDisplay attributes of a POSIX message queue.獲取 POSIX 消息隊列特性Linux supports POSIX message queues since kernel 2.6.6.
*/
#include <mqueue.h>
#include "tlpi_hdr.h"int
main(int argc, char *argv[])
{mqd_t mqd;struct mq_attr attr;if (argc != 2 || strcmp(argv[1], "--help") == 0)usageErr("%s mq-name\n", argv[0]);mqd = mq_open(argv[1], O_RDONLY);if (mqd == (mqd_t) -1)errExit("mq_open");if (mq_getattr(mqd, &attr) == -1)errExit("mq_getattr");printf("Maximum # of messages on queue: %ld\n", attr.mq_maxmsg);printf("Maximum message size: %ld\n", attr.mq_msgsize);printf("# of messages currently on queue: %ld\n", attr.mq_curmsgs);exit(EXIT_SUCCESS);
}
下面的 shell會話使用了示例程序來創建一個消息隊列并使用實現定義的默認值來初始化其特性(即傳入 mq_open()的最后一個參數為 NULL),然后顯示隊列特性,這樣就能夠看到 Linux 上的默認設置了。
$ ./pmsg_create -cx /mq
$ ./pmsg_getattr /mq
Maximum # of messages on queue: 10
Maximum message size: 8192
# of messages currently on queue: 0
$ ./pmsg_unlink /mq
從上面的輸出中可以看出 Linux 上 mq_maxmsg 和 mq_msgsize 的默認取值分別為 10 和8192。
4.3 修改消息隊列特性
mq_setattr()函數設置與消息隊列描述符 mqdes 相關聯的消息隊列描述的特性,并可選地返回與消息隊列有關的信息。
#include <mqueue.h>
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);/* Returns 0 on success, or-l on error */
mq_setattr()函數執行下列任務。
- 它使用 newattr 指向的 mq_attr 結構中的 mq_flags 字段來修改與描述符 mqdes 相關聯的消息隊列描述的標記(POSIX 規定使用 mq_setattr()能夠修改的唯一特性是 O_NONBLOCK 標記的狀態)。
- 如果 oldattr 不為 NULL,那么就返回一個包含之前的消息隊列描述標記和消息隊列特性的 mq_attr 結構(即與 mq_getattr()執行的任務一樣)。
應用程序應該通過使用 mq_getattr() 來 獲 取 mq_flags 值并修改O_NONBLOCK 位來修改 O_NONBLOCK 標記的狀態以及調用 mq_setattr()來修改 mq_flags 設置。如為啟用 O_NONBLOCK 需要編寫下列代碼:
if(mg_getattr(mqd,&attr)==-1)errExit("mq getattr");
attr.mq_flagS=O_NONBLOCK;
if(mq_setattr(mqd,&attr,NULL)==-1)errExit("mq_getattr");
5 交換消息
5.1 發送消息
mq_send()函數將位于 msg_ptr 指向的緩沖區中的消息添加到描述符 mqdes 所引用的消息隊列中。
#include <mqueue.h>
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);/* Returns 0 on success, or -l on error */
msg_len 參數指定了 msg_ptr 指向的消息的長度,其值必須小于或等于隊列的 mq_msgsize特性,否則 mq_send()就會返回 EMSGSIZE 錯誤。長度為零的消息是允許的。
每條消息都擁有一個用非負整數表示的優先級,它通過 msg_prio 參數指定。消息在隊列中是按照優先級倒序排列的(即 0 表示優先級最低)。當一條消息被添加到隊列中時,它會被放置在隊列中具有相同的優先級的所有消息之后。如果一個應用程序無需使用消息優先級,那么只需要將 msg_prio 指定為 0 即可。
Linux上 這個上限至少是 32768(_POSIX_MQ_PRIO_MAX),即優先級的取值范圍至少為 0 到 32767。
如果消息隊列已經滿了(即已經達到了隊列的 mq_maxmsg 限制),那么后續的 mq_send()調用會阻塞直到隊列中存在可用空間為止或者在 O_NONBLOCK 標記起作用時立即失敗并返回 EAGAIN 錯誤。
示例程序為 mq_send()函數提供了一個命令行界面。
/* pmsg_send.cUsage as shown in usageError().Send a message (specified as a command line argument) to a POSIX message queue.向 POSIX 消息隊列寫入一條消息See also pmsg_receive.c.Linux supports POSIX message queues since kernel 2.6.6.
*/
#include <mqueue.h>
#include <fcntl.h> /* For definition of O_NONBLOCK */
#include "tlpi_hdr.h"static void
usageError(const char *progName)
{fprintf(stderr, "Usage: %s [-n] mq-name msg [prio]\n", progName);fprintf(stderr, " -n Use O_NONBLOCK flag\n");exit(EXIT_FAILURE);
}int
main(int argc, char *argv[])
{int flags, opt;mqd_t mqd;unsigned int prio;flags = O_WRONLY;while ((opt = getopt(argc, argv, "n")) != -1) {switch (opt) {case 'n': flags |= O_NONBLOCK; break;default: usageError(argv[0]);}}if (optind + 1 >= argc)usageError(argv[0]);mqd = mq_open(argv[optind], flags);if (mqd == (mqd_t) -1)errExit("mq_open");prio = (argc > optind + 2) ? atoi(argv[optind + 2]) : 0;if (mq_send(mqd, argv[optind + 1], strlen(argv[optind + 1]), prio) == -1)errExit("mq_send");exit(EXIT_SUCCESS);
}
5.2 接收消息
mq_receive()函數從 mqdes 引用的消息隊列中刪除一條優先級最高、存在時間最長的消息并將刪除的消息放置在 msg_ptr 指向的緩沖區。
#include <mqueue.h>
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);/* Returns number of bytes in received message on success, or -l on error */
調用者使用 msg_len 參數來指定 msg_ptr 指向的緩沖區中的可用字節數。
不管消息的實際大小是什么,msg_len(即 msg_ptr 指向的緩沖區的大小)必須要大于或等于隊列的 mq_msgsize 特性,否則 mq_receive()就會失敗并返回 EMSGSIZE 錯誤。如果不清楚一個隊列的 mq_msgsize 特性的值,那么可以使用 mq_getattr()來獲取這個值。(在一個包含多個協作進程的應用程序中一般無需使用 mq_getattr(),因為應用程序通常能夠提前確定隊列的 mq_msgsize 設置。)
如果 msg_prio 不為 NULL,那么接收到的消息的優先級會被復制到 msg_prio 指向的位置處。如果消息隊列當前為空,那么 mq_receive() 會阻塞直到存在可用的消息或在O_NONBLOCK 標記起作用時會立即失敗并返回 EAGAIN 錯誤。
示例程序為 mq_receive()函數提供了一個命令行界面,在 usageError()函數中給出了這個程序的命令格式。下面的 shell 會話演示了示例程序的用法。首先創建了一個消息隊列并向其發送了一些具備不同優先級的消息。
$ ./pmsg_create -cx /mq
$ ./pmsg_send /mq msg-a 5
$ ./pmsg_send /mq msg-b 0
$ ./pmsg_send /mq msg-c 10
然后執行一系列命令來從隊列中接收消息。
$ ./pmsg_receive /mq
Read 5 bytes; priority = 10
msg-c
$ ./pmsg_receive /mq
Read 5 bytes; priority = 5
msg-a
$ ./pmsg_receive /mq
Read 5 bytes; priority = 0
msg-b
從上面的輸出中可以看出,消息的讀取是按照優先級來進行的。
此刻,這個隊列是空的。當再次執行阻塞式接收時,操作就會阻塞。
$ ./pmsg_receive /mq
另一方面,如果執行了一個非阻塞接收,那么調用就會立即返回一個失敗狀態。
$ ./pmsg_receive -n /mq
ERROR [EAGAIN/EWOULDBLOCK Resource temporarily unavailable] mq_receive
/* pmsg_receive.cUsage as shown in usageError().Receive a message from a POSIX message queue, and write it onstandard output.從 POSIX 消息隊列中讀取一條消息See also pmsg_send.c.Linux supports POSIX message queues since kernel 2.6.6.
*/
#include <mqueue.h>
#include <fcntl.h> /* For definition of O_NONBLOCK */
#include "tlpi_hdr.h"static void
usageError(const char *progName)
{fprintf(stderr, "Usage: %s [-n] mq-name\n", progName);fprintf(stderr, " -n Use O_NONBLOCK flag\n");exit(EXIT_FAILURE);
}int
main(int argc, char *argv[])
{int flags, opt;mqd_t mqd;unsigned int prio;void *buffer;struct mq_attr attr;ssize_t numRead;flags = O_RDONLY;while ((opt = getopt(argc, argv, "n")) != -1) {switch (opt) {case 'n': flags |= O_NONBLOCK; break;default: usageError(argv[0]);}}if (optind >= argc)usageError(argv[0]);mqd = mq_open(argv[optind], flags);if (mqd == (mqd_t) -1)errExit("mq_open");/* We need to know the 'mq_msgsize' attribute of the queue inorder to determine the size of the buffer for mq_receive() */if (mq_getattr(mqd, &attr) == -1)errExit("mq_getattr");buffer = malloc(attr.mq_msgsize);if (buffer == NULL)errExit("malloc");numRead = mq_receive(mqd, buffer, attr.mq_msgsize, &prio);if (numRead == -1)errExit("mq_receive");printf("Read %ld bytes; priority = %u\n", (long) numRead, prio);if (write(STDOUT_FILENO, buffer, numRead) == -1)errExit("write");write(STDOUT_FILENO, "\n", 1);exit(EXIT_SUCCESS);
}
5.3 在發送和接收消息時設置超時時間
mq_timedsend()和 mq_timedreceive()函數與 mq_send()和 mq_receive()幾乎是完全一樣的,它們之間唯一的差別在于如果操作無法立即被執行,并且該消息隊列描述上的O_NONBLOCK 標記不起作用,那么 abs_timeout 參數就會為調用阻塞的時間指定一個上限。
#define _XOPEN_SOURCE 600
#include <mqueue.h>
#include <time.h>
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec *abs_timeout);/* Returns 0 on success, or -l on error */
ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,unsigned int *msg_prio, const struct timespec *abs_timeout);/* Returns number of bytes in received message on success, or -l on error */
abs_timeout 參數是一個 timespec 結構,它將超時時間描述為自新紀元到現在的一個絕對值,其單位為秒數和納秒數。要指定一個相對超時則可以使用 clock_gettime()來獲取 CLOCK_REALTIME 時鐘的當前值并在該值上加上所需的時間量來生成一個恰當初始化過的 timespec 結構。
如果 mq_timedsend()或 mq_timedreceive()調用因超時而無法完成操作,那么調用就會失敗并返回 ETIMEDOUT 錯誤。
6 消息通知
POSIX 消息隊列能夠接收之前為空的隊列上有可用消息的異步通知(即隊列從空變成了非空)。這個特性意味著已經無需執行一個阻塞的調用或將消息隊列描述符標記為非阻塞并在隊列上定期執行 mq_receive()調用了,因為一個進程能夠請求消息到達通知,然后繼續執行其他任務直到收到通知為止。進程可以選擇通過信號的形式或通過在一個單獨的線程中調用一個函數的形式來接收通知。
mq_notify()函數注冊調用進程在一條消息進入描述符 mqdes 引用的空隊列時接收通知。
#include <mqueue.h>
int mq_notify(mqd_t mqdes, const struct sigevent *notification);/* Returns 0 on success, or -l on error */
notification 參數指定了進程接收通知的機制。在深入介紹 notification 參數的細節之前,有關消息通知需要注意以下幾點。
- 在任何一個時刻都只有一個進程(“注冊進程”)能夠向一個特定的消息隊列注冊接收通知。如果一個消息隊列上已經存在注冊進程了,那么后續在該隊列上的注冊請求將會失敗(mq_notify()返回 EBUSY 錯誤)。
- 只有當一條新消息進入之前為空的隊列時注冊進程才會收到通知。如果在注冊的時候隊列中已經包含消息,那么只有當隊列被清空之后有一條新消息達到之時才會發出通知。
- 當向注冊進程發送了一個通知之后就會刪除注冊信息,之后任何進程就能夠向隊列注冊接收通知了。換句話說,只要一個進程想要持續地接收通知,那么它就必須要在每次接收到通知之后再次調用 mq_notify()來注冊自己。
- 注冊進程只有在當前不存在其他在該隊列上調用 mq_receive()而發生阻塞的進程時才會收到通知。如果其他進程在 mq_receive()調用中被阻塞了,那么該進程會讀取消息,注冊進程會保持注冊狀態。
- 一個進程可以通過在調用 mq_notify()時傳入一個值為 NULL 的 notification 參數來撤銷自己在消息通知上的注冊信息。
示例程序中給出的是該結構的一個簡化版本,它只列出了與 mq_notify()相關的字段。
union sigval {int sival_int; /*Integer value for accompanying data */void *sival_ptr; /* Pointer value for accompanying data */
};struct sigevent {int sigev_notify; /* Notification method */int sigev_signo; /* Notification signal for SIGEV SIGNAL */union sigval sigev_value; /*Value passed to signal handler or thread function */void (*sigev_notify_function) (union sigval); /* Thread notification function */void *sigev_notify_attributes; /*Really 'pthread attr t'*/
這個結構的 sigev_notify 字段將會被設置成下列值中的一個。
- SIGEV_NONE
注冊這個進程接收通知,但當一條消息進入之前為空的隊列時不通知該進程。與往常一樣,當新消息進入空隊列之后注冊信息會被刪除。 - SIGEV_SIGNAL
通過生成一個在 sigev_signo 字段中指定的信號來通知進程。如果 sigev_signo 是一個實時信號,那么 sigev_value 字段將會指定信號都帶的數據。通過傳入信號處理器的siginfo_t 結構中的 si_value 字段或通過調用 sigwaitinfo()或 sigtimedwait()返回值能夠取得這部分數據。siginfo_t 結構中的下列字段也會被填充:si_code,其值為 SI_MESGQ;si_signo,其值是信號編號;si_pid,其值是發送消息的進程的進程 ID;以及 si_uid,其值是發送消息的進程的真實用戶 ID。(si_pid 和 si_uid 字段在其他大多數實現上不會被設置。) - SIGEV_THREAD
通過調用在 sigev_notify_function 中指定的函數來通知進程,就像是在一個新線程中啟動該函數一樣。sigev_notify_attributes 字段可以為 NULL 或是一個指向定義了線程的特性的 pthread_ attr_t 結構的指針。sigev_value 中指定的聯合 sigval 值將會作為參數傳入這個函數。
6.1 通過信號接收通知
示例程序提供了一個使用信號來進行消息通知的例子。這個程序執行了下列任務。
- 以非阻塞模式打開了一個通過命令行指定名稱的消息隊列①,確定了該隊列的mq_msgsize 特性的值②,并分配了一個大小為該值的緩沖區來接收消息③。
- 阻塞通知信號(SIGUSR1)并為其建立一個處理器④。
- 首次調用 mq_notify()來注冊進程接收消息通知⑤。
- 執行一個無限循環,在循環中執行下列任務。
(1) 調用 sigsuspend(),該函數會解除通知信號的阻塞狀態并等待直到信號被捕獲⑥。從這個系統調用中返回表示已經發生了一個消息通知。此刻,進程會撤銷消息通知的注冊信息。
(2) 調用 mq_notify()重新注冊進程接收消息通知⑦。
(3) 執行一個 while 循環從隊列中盡可能多地讀取消息以便清空隊列⑧。
/* mq_notify_sig.cUsage: mq_notify_sig mq-nameDemonstrate message notification via signals (catching the signals witha signal handler) on a POSIX message queue.通過信號接收消息通知
*/
#include <signal.h>
#include <mqueue.h>
#include <fcntl.h> /* For definition of O_NONBLOCK */
#include "tlpi_hdr.h"#define NOTIFY_SIG SIGUSR1static void
handler(int sig)
{/* Just interrupt sigsuspend() */
}/* This program does not handle the case where a message already exists onthe queue by the time the first attempt is made to register for messagenotification. In that case, the program would never receive a notification.See mq_notify_via_signal.c for an example of how to deal with that case. */int
main(int argc, char *argv[])
{struct sigevent sev;mqd_t mqd;struct mq_attr attr;void *buffer;ssize_t numRead;sigset_t blockMask, emptyMask;struct sigaction sa;if (argc != 2 || strcmp(argv[1], "--help") == 0)usageErr("%s mq-name\n", argv[0]);mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK); //以非阻塞模式打開了一個通過命令行指定名稱的消息隊列if (mqd == (mqd_t) -1)errExit("mq_open");/* Determine mq_msgsize for message queue, and allocate an input bufferof that size */if (mq_getattr(mqd, &attr) == -1) //確定了該隊列的mq_msgsize 特性的值errExit("mq_getattr");buffer = malloc(attr.mq_msgsize); //并分配了一個大小為該值的緩沖區來接收消息if (buffer == NULL)errExit("malloc");/* Block the notification signal and establish a handler for it */sigemptyset(&blockMask); //阻塞通知信號(SIGUSR1)并為其建立一個處理器sigaddset(&blockMask, NOTIFY_SIG);if (sigprocmask(SIG_BLOCK, &blockMask, NULL) == -1)errExit("sigprocmask");sigemptyset(&sa.sa_mask);sa.sa_flags = 0;sa.sa_handler = handler;if (sigaction(NOTIFY_SIG, &sa, NULL) == -1)errExit("sigaction");/* Register for message notification via a signal */sev.sigev_notify = SIGEV_SIGNAL; //首次調用 mq_notify()來注冊進程接收消息通知sev.sigev_signo = NOTIFY_SIG;if (mq_notify(mqd, &sev) == -1)errExit("mq_notify");sigemptyset(&emptyMask);for (;;) {sigsuspend(&emptyMask); /* 調用 sigsuspend(),該函數會解除通知信號的阻塞狀態并等待直到信號被捕獲 *//* Reregister for message notification */if (mq_notify(mqd, &sev) == -1) //調用 mq_notify()重新注冊進程接收消息通知errExit("mq_notify");while ((numRead = mq_receive(mqd, buffer, attr.mq_msgsize, NULL)) >= 0) //執行一個 while 循環從隊列中盡可能多地讀取消息以便清空隊列printf("Read %ld bytes\n", (long) numRead);if (errno != EAGAIN) /* Unexpected error */errExit("mq_receive");}
}
示例程序中存在很多方面值得詳細解釋。
- 程序阻塞了通知信號并使用 sigsuspend()來等待該信號,而沒有使用 pause(),這是為了防止出現程序在執行 for 循環中的其他代碼(即沒有因等待信號而阻塞)時錯過信號的情況。如果發生了這種情況,并且使用了 pause()來等待信號,那么下次調用 pause()時會阻塞,即使系統已經發出了一個信號。
- 程序以非阻塞模式打開了隊列,并且當一個通知發生之后使用一個 while 循環來讀取隊列中的所有消息。通過這種方式來清空隊列能夠確保當一條新消息到達之后會產生一個新通知。使用非阻塞模式意味著 while 循環在隊列被清空之后就會終止(mq_receive()會失敗并返回 EAGAIN 錯誤)。
- 在 for 循環中比較重要的一點是在讀取隊列中的所有消息之前重新注冊接收消息通知。如果顛倒了順序,如按照下面的順序:隊列中的所有消息都被讀取了,while 循環終止;另一個消息被添加到了隊列中;mq_notify()被調用以重新注冊接收消息通知。此刻,系統將不會產生新的通知信號,因為隊列已經非空了,其結果是程序在下次調用 sigsuspend()時會永遠阻塞。
6.2 通過線程接收通知
示例程序提供了一個使用線程來發布消息通知的例子。
- 當消息通知發生時,程序會在清空隊列之前重新啟用通知②。
- 采用了非阻塞模式使得在接收到一個通知之后可以在無需阻塞的情況下完全清空隊列⑤。
/* mq_notify_thread.cDemonstrate message notification via threads on a POSIX message queue.
*/
#include <pthread.h>
#include <mqueue.h>
#include <signal.h>
#include <fcntl.h> /* For definition of O_NONBLOCK */
#include "tlpi_hdr.h"/* This program does not handle the case where a message already exists onthe queue by the time the first attempt is made to register for messagenotification. In that case, the program would never receive a notification.See mq_notify_via_thread.c for an example of how to deal with that case. */static void notifySetup(mqd_t *mqdp);static void /* Thread notification function */
threadFunc(union sigval sv)
{ssize_t numRead;mqd_t *mqdp;void *buffer;struct mq_attr attr;mqdp = sv.sival_ptr;/* Determine mq_msgsize for message queue, and allocate an input bufferof that size */if (mq_getattr(*mqdp, &attr) == -1)errExit("mq_getattr");buffer = malloc(attr.mq_msgsize);if (buffer == NULL)errExit("malloc");/* Reregister for message notification */notifySetup(mqdp);while ((numRead = mq_receive(*mqdp, buffer, attr.mq_msgsize, NULL)) >= 0)printf("Read %ld bytes\n", (long) numRead);if (errno != EAGAIN) /* Unexpected error */errExit("mq_receive");free(buffer);
}static void
notifySetup(mqd_t *mqdp)
{struct sigevent sev;sev.sigev_notify = SIGEV_THREAD; /* Notify via thread */sev.sigev_notify_function = threadFunc;sev.sigev_notify_attributes = NULL;/* Could be pointer to pthread_attr_t structure */sev.sigev_value.sival_ptr = mqdp; /* Argument to threadFunc() */if (mq_notify(*mqdp, &sev) == -1)errExit("mq_notify");
}int
main(int argc, char *argv[])
{mqd_t mqd;if (argc != 2 || strcmp(argv[1], "--help") == 0)usageErr("%s mq-name\n", argv[0]);mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK);if (mqd == (mqd_t) -1)errExit("mq_open");notifySetup(&mqd);pause(); /* Wait for notifications via thread function */
}
示例程序的設計還需要注意以下幾點。
- 程序通過一個線程來請求通知需要將傳入 mq_notify()的 sigevent 結構的 sigev_notify字段的值指定為 SIGEV_THREAD 。線程的啟動函數 threadFunc() 是通過sigev_notify_function 字段來指定的③。
- 在啟用消息通知之后,主程序會永遠中止⑥;定時器通知是通過在一個單獨的線程中調用 threadFunc()來分發的①。
- 本來可以通過將消息隊列描述符 mqd 變成一個全局變量使之對 threadFunc()可見,但這里采用了一種不同的做法:將消息隊列描述符的地址放在了傳給 mq_notify()的sigev_value.sival_ptr 字段中④。當后面調用 threadFunc()時,這個參數會作為其參數被傳入到該函數中。
7 Linux 特有的特性
POSIX 消息隊列在 Linux 上的實現提供了一些非標準的卻相當有用的特性。
7.1 通過命令行顯示和刪除消息隊列對象
POSIX IPC 對象被實現成了虛擬文件系統中的文件,并且可以使用 ls和 rm 來列出和刪除這些文件。為列出和刪除 POSIX 消息隊列就必須要使用形如下面的命令來將消息隊列掛載到文件系統中。
# mount -t mqueue source target
source 可以是任意一個名字(通常將其指定為字符串 none),其唯一的意義是它將出現在/proc/mounts 中并且 mount 和 df 命令會顯示出這個名字。target 是消息隊列文件系統的掛載點。
下面的 shell 會話顯示了如何掛載消息隊列文件系統和顯示其內容。首先為文件系統創建一個掛載點并掛載它。
$ sudo mkdir /dev/mqueue
$ sudo mount -t mqueue none /dev/mqueue
接著顯示新掛載在/proc/mounts 中的記錄,然后顯示掛載目錄上的權限。
$ cat /proc/mounts | grep mqueue
none /dev/mqueue mqueue rw 0 0
$ ls -ld /dev/mqueue
drwxrwxrwt 2 root root 60 May 7 16:54 /dev/mqueue/
在 ls 命令的輸出中需要注意的一點是消息隊列文件系統在掛載時會自動為掛載目錄設置粘滯位。(從 ls 的輸出中的 other-execute 權限字段中有一個 t 就可以看出這一點。)這意味著非特權進程只能在它所擁有的消息隊列上執行斷開鏈接的操作。
接著創建一個消息隊列,使用 ls 來表明它在文件系統中是可見的,然后刪除該消息隊列。
$ ./pmsg_create -c /newq
$ ls /dev/mqueue
newq
$ rm /dev/mqueue/newq
7.2 獲取消息隊列的相關信息
可以顯示消息隊列文件系統中的文件的內容,每個虛擬文件都包含了其關聯的消息隊列的相關信息。
$ ./pmsg_create -c /mq
$ ./pmsg_send /mq abcdefg
$ cat /dev/mqueue/mq
QSIZE:7 NOTIFY:0 SIGNO:0 NOTIFY_PID:0
QSIZE 字段的值為隊列中所有數據的總字節數,剩下的字段則與消息通知相關。如果NOTIFY_PID 為非零,那么進程 ID 為該值的進程已經向該隊列注冊接收消息通知了,剩下的字段則提供了與這種通知相關的信息。
- NOTIFY 是一個與其中一個 sigev_notify 常量對應的值:0 表示 SIGEV_SIGNAL,1表示 SIGEV_NONE,2 表示 SIGEV_THREAD。
- 如果通知方式是 SIGEV_SIGNAL,那么 SIGNO 字段指出了哪個信號會用來分發消息通知。下面的 shell 會話對這些字段中包含的信息進行了說明。
$ ./mq_notify_sig /mq &
[1] 1920782
$ cat /dev/mqueue/mq
QSIZE:7 NOTIFY:0 SIGNO:10 NOTIFY_PID:1920782
$ kill %1
$ ./mq_notify_thread /mq &
[2] 1921732
[1] Terminated ./mq_notify_sig /mq
$ cat /dev/mqueue/mq
QSIZE:7 NOTIFY:2 SIGNO:0 NOTIFY_PID:1921732
7.3 使用另一種 I/O 模型操作消息隊列
在 Linux 實現上,消息隊列描述符實際上是一個文件描述符,因此可以使用 I/O 多路復用系統調用(select()和 poll())或 epoll API 來監控這個文件描述符。
8 消息隊列限制
SUSv3 為 POSIX 消息隊列定義了兩個限制。
- MQ_PRIO_MAX
它定義了一條消息的最大優先級。 - MQ_OPEN_MAX
一個實現可以定義這個限制來指明一個進程最多能打開的消息隊列數量。由于 Linux將消息隊列描述符實現成了文件描述符,因此適用于文件描述符的限制將適用于消息隊列描述符。(換句話說,在 Linux 上,每個進程以及系統所能打開的文件描述符的數量限制實際上會應用于文件描述符數量和消息隊列描述符數量之和。)
Linux 還提供了一些/proc 文件來查看和修改(需具備特權)控制 POSIX 消息隊列的使用的限制。下面這三個文件位于/proc/sys/fs/ mqueue 目錄中。
msg_max
這個限制為新消息隊列的 mq_maxmsg 特性的取值規定了一個上限(即使用 mq_open()創建隊列時 attr.mq_maxmsg 字段的上限值)。這個限制的默認值是 10,最小值是 1(在早于 2.6.28 的內核中是10),最大值由內核常量HARD_MSGMAX 定義,該常量的值是通過公式(131072 / sizeof(void *))計算得來的,在 Linux/x86-32 上其值為 32768。當一個特權進程(CAP_SYS_RESOURCE)調用 mq_open()時 msg_max 限制會被忽略,但 HARD_MSGMAX 仍然擔當著 attr.mq_maxmsg 的上限值的角色。
msgsize_max
這個限制為非特權進程創建的新消息隊列的 mq_msgsize 特性的取值規定了一個上限(即使用 mq_open()創建隊列時 attr.mq_msgsize 字段的上限值)。這個限制的默認值是 8192,最小值是 128(在早于 2.6.28 的內核中是 8192),最大值是 1048576(在早于 2.6.28 的內核中是INT_MAX)。當一個非特權進程(CAP_SYS_RESOURCE)調用 mq_open()時會忽略這個限制。
queues_max
這是一個系統級別的限制,它規定了系統上最多能夠創建的消息隊列的數量。一旦達到這個限制,就只有特權進程(CAP_SYS_RESOURCE)才能夠創建新隊列。這個限制的默認值是 256,其取值可以為范圍從 0 到 INT_MAX 之間的任意一個值。
Linux 還提供了 RLIMIT_MSGQUEUE 資源限制,它可以用來為屬于調用進程的真實用戶ID 的所有消息隊列所消耗的空間規定一個上限。