我們知道進程間通信的方法有多種,主要有管道,消息隊列,信號量,共享內存,socket等。之前介紹過管道,今天再介紹一個新的概念–消息隊列。
消息隊列:將一個進程到另一個進程之間發送數據塊的方式。這些發送的數據塊需要存在一個消息隊列緩沖區中。這些數據塊都是有一定的類型的,我們可以通過存放消息的數據塊來避免命名管道與匿名管道所帶來的同步與阻塞問題。與管道不同的是,消息隊列是基于消息的,而管道是基于字節流的。同時,消息隊列中存放的消息的數量是有限的。有最大長度(MSGMAX)和消息隊列的總的字節數(MSGMNB)。也有系統規定的消息隊列的總數(MSGMNI)。
內核為每個IPC對象維護了一個數據結構:struct ipc;
struct ipc_perm {
key_t __key; /* ftok所獲取的唯一標識的key值*/
uid_t uid; /* 擁有者的有效uid*/
gid_t gid; /* 擁有者的有效gid */
uid_t cuid; /* 創建者的有效uid*/
gid_t cgid; /* 創建者的有效gid */
unsigned short mode; /* 權限 */
unsigned short __seq; /* 序列號*/
注:
消息隊列,信號量,共享內存都有一個ipc數據結構。
在/usr/include/linux/msg.h下,有一個消息隊列的數據結構:
由圖可以看到ipc_perm就是剛剛說的幾種通信方式所共有的數據結構。其他的一些就是消息隊列結構體所私有的特性。還有其中的msg_first指針和msg_last指針分別指向消息隊列的第一條消息和最后的消息。消息隊列是用鏈表實現的。
同樣,這幅圖也是在msg.h中的。這里是消息緩沖區結構體和對于消息的一些信息。msgbuf結構體中的mtype是寫的消息的長度大小,即mtext的size。數組mtext即是存放消息的數組。
消息隊列的具體實現:
1.創建消息隊列:
int msgget(key_t key,int msgflag);
key可以認為是端口號,由ftok生成一個唯一的key值。用來創建消息隊列。
msgflag:是創建的消息隊列的方式,有IPC_CREAT和IPC_EXCL。
(1)IPC_CREAT:單獨使用時,如果已經存在已有的IPC資源,就直接返回已存在的IPC,如果不存在則創建一個新的IPC資源。
(2)IPC_CREAT|IPC_EXCL:同時使用時表示,不存在則創建一個,存在的話返回錯誤消息。
2.將消息發送到消息隊列中
int msgsnd(int msgid,const void* msgp,size_t msgsz,int msgflg);
msgid:表示唯一確定的一個消息隊列的id。
msgp: 表示存放數據的消息緩沖區
msgsz:消息文本的大小
msgflg:表示在隊列沒有數據的情況下進行的操作。一般設置為0,表示當隊列空或滿的時候,以阻塞式等待的方式進行處理。
3.從消息隊列中取消息
ssize_t msgrgv(int msgid,const void* msgp,size_t msgsz,long msgtyp,int msgflg);
和上述往消息隊列中寫數據類似,這里取消息時,有msgtyp表示取哪種類型的消息,即消息隊列中讀取的消息形態。
4.設置消息隊列的屬性
int msgctl(int msgid,int cmd,struct msqid_ds *buf);
這里主要是對destroy進行操作,將cmd設置為IPC_RMID即可。
代碼實現(server端與client端的通信):
//comm.h
#include<stdio.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<string.h>#define PATHNAME "."
#define PROJ_ID 0x6666
#define SERVER_TYPE 1
#define CLIENT_TYPE 2struct msgbuf {long mtype;char mtext[1024];
};int createMsgqueue();
int getMsg();
int destroyMsg(int msgid);
int recvMsg(int msgid,long recvtype,char out[]);
int sendMsg(int msgid,long who,char* msg);#endif //_COMM_H//comm.c
#include"comm.h"int commMsgqueue(int flags)
{key_t _key = ftok(PATHNAME,PROJ_ID);if(_key < 0){perror("ftok");return -1;}int msgid = msgget(_key,flags);if(msgid < 0){perror("msgget");return -2;}return msgid;
}int createMsgqueue()
{return commMsgqueue(IPC_CREAT | IPC_EXCL|0666);
}
int getMsg()
{return commMsgqueue(IPC_CREAT);
}
int destroyMsg(int msgid)
{if(msgctl(msgid,IPC_RMID,NULL) < 0){perror("msgctl");return -1;}return 0;
}int recvMsg(int msgid,long recvtype,char out[])
{struct msgbuf buf;if(msgrcv(msgid,(void*)&buf,sizeof(buf.mtext),recvtype,0) < 0){perror("msgrcv");return -1;}strcpy(out,buf.mtext);return 0;
}int sendMsg(int msgid,long who,char* msg)
{struct msgbuf buf;buf.mtype=who;strcpy(buf.mtext,msg);if(msgsnd(msgid,(void*)&buf,sizeof(buf.mtext),0) < 0){perror("msgsnd");return -1;}return 0;
}//server.c
#include"comm.h"
#include<stdio.h>int main()
{int msgid = createMsgqueue();char buf[1024];while(1){buf[0]=0;recvMsg(msgid,CLIENT_TYPE,buf);printf("client# %s\n",buf);printf("please Enter# ");fflush(stdout);ssize_t s = read(0,buf,sizeof(buf));if(s > 0){buf[s-1] = 0;sendMsg(msgid,SERVER_TYPE,buf);printf("send done,wait recv...\n");}}destroyMsg(msgid);return 0;
}//client.c
#include"comm.h"
#include<stdio.h>int main()
{int msgid = createMsgqueue();char buf[1024];while(1){buf[0]=0;recvMsg(msgid,CLIENT_TYPE,buf);printf("client# %s\n",buf);printf("please Enter# ");fflush(stdout);ssize_t s = read(0,buf,sizeof(buf));if(s > 0){buf[s-1] = 0;sendMsg(msgid,SERVER_TYPE,buf);printf("send done,wait recv...\n");}}destroyMsg(msgid);return 0;
}
[xiaoxu@bogon msgqueue]$ cat client.c
#include<stdio.h>
#include"comm.h"int main()
{int msgid = getMsg();char buf[1024];while(1){buf[0]=0;printf("please Enter# ");fflush(stdout);ssize_t s = read(0,buf,sizeof(buf));if(s > 0){buf[s-1] = 0;sendMsg(msgid,CLIENT_TYPE,buf);printf("send done,wait recv...\n");}recvMsg(msgid,SERVER_TYPE,buf);printf("server# %s\n",buf);
}return 0;
}
//Makefile
.PHONY:all
all:client serverclient:client.c comm.cgcc -o $@ $^
server:server.c comm.cgcc -o $@ $^.PHONY:clean
clean:rm -f server client