以下是一個實現,可以發送和接收任意類型的結構體消息,而不僅限于特定的CustomMsg
類型:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>// 通用消息結構體模板
// 要求: 所有消息結構體的第一個字段必須是long類型的mtype
#define MSG_HEADER long mtype// 消息隊列配置
#define MSG_QUEUE_KEY 0x1234 // 自定義消息隊列鍵值// 錯誤處理宏
#define ERROR_EXIT(msg) do { perror(msg); exit(EXIT_FAILURE); } while(0)// 創建消息隊列
int create_message_queue(key_t key) {int msgid;// 創建消息隊列 (IPC_CREAT | IPC_EXCL | 0666)if ((msgid = msgget(key, IPC_CREAT | IPC_EXCL | 0666)) == -1) {if (errno == EEXIST) {// 如果已存在,則直接獲取msgid = msgget(key, 0666);if (msgid == -1) {ERROR_EXIT("msgget failed to get existing queue");}} else {ERROR_EXIT("msgget failed to create new queue");}}return msgid;
}// 發送任意結構體消息
// msg: 指向包含MSG_HEADER的結構體指針
// msg_size: 結構體總大小
void send_struct_message(int msgid, void *msg, size_t msg_size) {// 計算實際數據大小 (減去mtype的大小)size_t data_size = msg_size - sizeof(long);if (msgsnd(msgid, msg, data_size, IPC_NOWAIT) == -1) {ERROR_EXIT("msgsnd failed");}printf("Sent %zu byte message (type %ld)\n", msg_size, ((long*)msg)[0]);
}// 接收任意結構體消息
// msg: 指向包含MSG_HEADER的結構體指針
// msg_size: 結構體總大小
// msg_type: 期望接收的消息類型
void receive_struct_message(int msgid, void *msg, size_t msg_size, long msg_type) {// 計算實際數據大小 (減去mtype的大小)size_t data_size = msg_size - sizeof(long);ssize_t bytes = msgrcv(msgid, msg, data_size, msg_type, 0);if (bytes == -1) {ERROR_EXIT("msgrcv failed");}printf("Received %zd byte message (type %ld)\n", bytes + sizeof(long), ((long*)msg)[0]);
}// 刪除消息隊列
void remove_message_queue(int msgid) {if (msgctl(msgid, IPC_RMID, NULL) == -1) {if (errno != EIDRM) { // 忽略已刪除的錯誤perror("msgctl IPC_RMID failed");}}
}// ===================== 使用示例 =====================// 示例消息結構體1
typedef struct {MSG_HEADER; // 必須作為第一個字段int sensor_id;float temperature;float humidity;unsigned long timestamp;
} SensorData;// 示例消息結構體2
typedef struct {MSG_HEADER; // 必須作為第一個字段char device_name[16];int state;int error_code;float voltage;float current;
} DeviceStatus;// 示例消息結構體3
typedef struct {MSG_HEADER; // 必須作為第一個字段short x;short y;short z;unsigned char accuracy;
} MotionData;int main() {int msgid = create_message_queue(MSG_QUEUE_KEY);pid_t pid = fork();if (pid < 0) {ERROR_EXIT("fork failed");}if (pid > 0) { // 父進程 - 發送者// 給接收者時間啟動sleep(1);// 發送SensorData消息SensorData sensor_msg = {.mtype = 1,.sensor_id = 101,.temperature = 25.6f,.humidity = 45.7f,.timestamp = 1234567890};send_struct_message(msgid, &sensor_msg, sizeof(SensorData));// 發送DeviceStatus消息DeviceStatus device_msg = {.mtype = 2,.device_name = "Main Controller",.state = 1,.error_code = 0,.voltage = 3.3f,.current = 0.75f};send_struct_message(msgid, &device_msg, sizeof(DeviceStatus));// 發送MotionData消息MotionData motion_msg = {.mtype = 3,.x = 1024,.y = -512,.z = 256,.accuracy = 95};send_struct_message(msgid, &motion_msg, sizeof(MotionData));// 等待接收者處理sleep(1);// 刪除消息隊列remove_message_queue(msgid);} else { // 子進程 - 接收者// 接收SensorData消息SensorData recv_sensor;receive_struct_message(msgid, &recv_sensor, sizeof(SensorData), 1);printf("SensorData: ID=%d, Temp=%.1f°C, Hum=%.1f%%, Time=%lu\n",recv_sensor.sensor_id, recv_sensor.temperature,recv_sensor.humidity, recv_sensor.timestamp);// 接收DeviceStatus消息DeviceStatus recv_device;receive_struct_message(msgid, &recv_device, sizeof(DeviceStatus), 2);printf("DeviceStatus: Name='%s', State=%d, Error=%d, V=%.2fV, I=%.2fA\n",recv_device.device_name, recv_device.state,recv_device.error_code, recv_device.voltage, recv_device.current);// 接收MotionData消息MotionData recv_motion;receive_struct_message(msgid, &recv_motion, sizeof(MotionData), 3);printf("MotionData: X=%d, Y=%d, Z=%d, Accuracy=%d%%\n",recv_motion.x, recv_motion.y, recv_motion.z, recv_motion.accuracy);}return EXIT_SUCCESS;
}
消息隊列限制:
使用
msgctl(IPC_STAT)
檢查隊列狀態監控隊列使用情況,避免溢出
這個實現提供了高度靈活的消息傳遞機制,適用于各種嵌入式場景,從簡單的傳感器數據采集到復雜的設備控制命令,都可以通過定義適當的結構體來實現高效通信。
=========================阻塞和非阻塞接收方式===============================
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <stdbool.h>// 通用消息頭要求
#define MSG_HEADER long mtype// 消息隊列配置
#define MSG_QUEUE_KEY 0x1234// 錯誤處理宏
#define ERROR_EXIT(msg) do { perror(msg); exit(EXIT_FAILURE); } while(0)// 創建消息隊列
int create_message_queue(key_t key) {int msgid = msgget(key, IPC_CREAT | 0666);if (msgid == -1) {ERROR_EXIT("msgget failed");}return msgid;
}// 發送任意結構體消息(阻塞)
void send_struct_message(int msgid, void *msg, size_t msg_size) {size_t data_size = msg_size - sizeof(long);if (msgsnd(msgid, msg, data_size, 0) == -1) { // 阻塞發送ERROR_EXIT("msgsnd failed");}printf("Sent %zu byte message (type %ld)\n", msg_size, ((long*)msg)[0]);
}// 阻塞接收任意結構體消息
bool receive_struct_message_blocking(int msgid, void *msg, size_t msg_size, long msg_type) {size_t data_size = msg_size - sizeof(long);ssize_t bytes = msgrcv(msgid, msg, data_size, msg_type, 0);if (bytes == -1) {return false;}printf("Received %zd byte message (type %ld) [blocking]\n", bytes + sizeof(long), ((long*)msg)[0]);return true;
}// 非阻塞接收任意結構體消息
bool receive_struct_message_nonblocking(int msgid, void *msg, size_t msg_size, long msg_type) {size_t data_size = msg_size - sizeof(long);ssize_t bytes = msgrcv(msgid, msg, data_size, msg_type, IPC_NOWAIT);if (bytes == -1) {if (errno == ENOMSG) {// 沒有消息不是錯誤,只是需要重試return false;}ERROR_EXIT("msgrcv failed");}printf("Received %zd byte message (type %ld) [non-blocking]\n", bytes + sizeof(long), ((long*)msg)[0]);return true;
}// 帶超時的接收(混合模式)
bool receive_struct_message_timeout(int msgid, void *msg, size_t msg_size, long msg_type, int timeout_sec) {for (int i = 0; i < timeout_sec * 10; i++) {if (receive_struct_message_nonblocking(msgid, msg, msg_size, msg_type)) {return true;}// 等待100ms后重試usleep(100 * 1000);}return false;
}// 刪除消息隊列
void remove_message_queue(int msgid) {if (msgctl(msgid, IPC_RMID, NULL) == -1 && errno != EIDRM) {perror("msgctl IPC_RMID failed");}
}// ===================== 使用示例 =====================typedef struct {MSG_HEADER;int counter;char data[64];
} TestMessage;int main() {int msgid = create_message_queue(MSG_QUEUE_KEY);pid_t pid = fork();if (pid < 0) {ERROR_EXIT("fork failed");}if (pid > 0) { // 父進程 - 發送者sleep(1); // 等待接收者準備// 發送3條消息for (int i = 1; i <= 3; i++) {TestMessage msg = {.mtype = 1,.counter = i,.data = "Blocking test"};send_struct_message(msgid, &msg, sizeof(TestMessage));sleep(1);}// 發送快速連續消息for (int i = 4; i <= 6; i++) {TestMessage msg = {.mtype = 2,.counter = i,.data = "Non-blocking test"};send_struct_message(msgid, &msg, sizeof(TestMessage));}// 等待接收者處理sleep(2);remove_message_queue(msgid);} else { // 子進程 - 接收者// 1. 阻塞接收演示printf("=== Blocking Receive Test ===\n");for (int i = 0; i < 3; i++) {TestMessage msg;if (receive_struct_message_blocking(msgid, &msg, sizeof(TestMessage), 1)) {printf("Blocking received: counter=%d, data=%s\n", msg.counter, msg.data);}}// 2. 非阻塞接收演示printf("\n=== Non-blocking Receive Test ===\n");int received = 0;while (received < 3) {TestMessage msg;if (receive_struct_message_nonblocking(msgid, &msg, sizeof(TestMessage), 2)) {printf("Non-blocking received: counter=%d, data=%s\n", msg.counter, msg.data);received++;} else {printf("No message available, doing other work...\n");sleep(1); // 模擬其他工作}}// 3. 超時接收演示printf("\n=== Timeout Receive Test ===\n");TestMessage msg;if (receive_struct_message_timeout(msgid, &msg, sizeof(TestMessage), 3, 2)) {printf("Received message within timeout\n");} else {printf("Timeout waiting for message (type 3)\n");}}return EXIT_SUCCESS;
}