前言
? ? ? ? 開發設計時,通常會對業務流程進行模塊化,有些流程之間,不要求同步,但又需要傳遞信息時,如果存儲到數據庫,效率降低很多,如果是存放在內存是最好的。此時可以選擇系統的IPC(進程間通信,如共享內存等),本文講解的是適合輕量級的隊列緩存場景的mqueue。
功能講解
mqueue特性
????????Linux的mqueue(消息隊列)是POSIX標準中定義的進程間通信(IPC)機制,允許不同進程通過內核維護的隊列傳遞結構化消息。其具備以下幾個特性:
- 存儲在指定文件:
mqueue
消息隊列文件默認掛載在/dev/mqueue
目錄下。通過mq_open
創建的消息隊列會在此目錄生成對應文件節點,內核使用紅黑樹管理消息的存儲與優先級 - 持久性:POSIX消息隊列隨系統重啟消失
- 可在命令行查看隊列信息:cat /dev/mqueue/[隊列名] ?# 查看隊列屬性(如最大消息數、消息大小)
功能介紹
需求場景:某些功能需要在root用戶下作為服務執行,組裝的生產數據需要推送給登錄系統桌面的普通用戶權限的應用。
下面以在root權限下運行的讀取usb信息的服務,監測USB的插拔事件并把信息推送到mqueue,而普通用戶的應用通過讀取mqueue獲取USB插拔信息為例。
獲取事件信息寫入mqueue
#ifndef USBACTION_H_
#define USBACTION_H_
#include <unistd.h>
#include <cstring>
#include <iostream>
#include <sstream>
#include <string>
#include <algorithm>
#include <vector>#include <cstdio>
#include <stdio.h>
#include <dlfcn.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <libudev.h>
#include <mqueue.h>
using namespace std;
#define MAX_NUM 100struct Message {long mtype; // 消息類型char mtext[256]; // 消息內容
};const int MSG_TYPE = 1;
const char* QUEUE_NAME = "/usb_msg";//在/dev/mqueue目錄下class Usbaction: public TUtilBase {
public:Usbaction();~Usbaction();int Run();private://int Init();//初始化void monitorDevices();int sendtomqueue(const char* mqstr);unsigned int uSleepTime; //刷新間隔};#endif /* USBACTION_H_ */
#include "usbaction.h"std::vector<std::string> split(const std::string& str, char delimiter) {std::vector<std::string> tokens;std::istringstream tokenStream(str);std::string token;while (std::getline(tokenStream, token, delimiter)) {tokens.push_back(token);}return tokens;
}Usbaction::Usbaction() {uSleepTime = 5;
}Usbaction::~Usbaction() {
}int Usbaction::sendtomqueue(const char* mqstr){mqd_t mq;struct mq_attr attr;attr.mq_flags = O_NONBLOCK; // 設置為非阻塞模式attr.mq_maxmsg = MAX_NUM; // 隊列中最大消息數attr.mq_msgsize = sizeof(Message);// 消息的最大大小(字節)attr.mq_curmsgs = 0; // 隊列中當前消息數mq = mq_open(QUEUE_NAME, O_CREAT | O_WRONLY, 0644, &attr);if (mq == (mqd_t)-1) {WRITELOG(LOG_ERROR,"mq_open Error");return 0;}Message msg;msg.mtype = MSG_TYPE;strcpy(msg.mtext,mqstr);//attr.mq_flags |= O_NONBLOCK;if (mq_setattr(mq, &attr, NULL) == -1) {return 0;}if (mq_send(mq, reinterpret_cast<char*>(&msg), sizeof(msg), 0) == -1) {if (errno == EAGAIN) {} else {perror("mq_send");}}else {std::cout << "Message sent: " << msg.mtext << std::endl;}// 關閉消息隊列mq_close(mq);return 1;
}void Usbaction::monitorDevices() {// 創建 udev 對象struct udev *udev = udev_new();if (!udev) {return;}// 創建 udev 監視器struct udev_monitor *mon = udev_monitor_new_from_netlink(udev, "udev");if (!mon) {udev_unref(udev);return;}// 添加過濾器以匹配usb 子系統udev_monitor_filter_add_match_subsystem_devtype(mon, "usb", nullptr);udev_monitor_enable_receiving(mon);std::cout << "Monitoring block and USB events. Press Ctrl+C to exit." << std::endl;while (true) {struct udev_device *dev = udev_monitor_receive_device(mon);if (dev) {bool bfind=false;std::string level="";const char* subsystem = udev_device_get_subsystem(dev);const char* action = udev_device_get_action(dev);const char* devnode = udev_device_get_devnode(dev);const char *pro = udev_device_get_property_value(dev, "ID_USB_INTERFACES");if (subsystem && action && devnode) {if (strcmp(action,"add")==0||strcmp(action,"remove")==0){bfind=true;std::vector<std::string> tokens = split(devnode, '/');// 確保有至少兩個分割結果if (tokens.size() >= 2) {string device = tokens[tokens.size() - 1];string bus = tokens[tokens.size() - 2];level = bus + ":" + device;}}}if (bfind) {const char* idVendor = udev_device_get_sysattr_value(dev,"idVendor");const char* idProduct = udev_device_get_sysattr_value(dev, "idProduct");if (idVendor && idProduct) {//strcmp(action,"add")==0char sendbuf[256]={0};if(pro){sprintf(sendbuf,"%s,%s,%s:%s,%s",action,level.c_str(),idVendor,idProduct,pro);}elsesprintf(sendbuf,"%s,%s,%s:%s",action,level.c_str(),idVendor,idProduct);sendtomqueue(sendbuf);
%s",action,level.c_str(),idVendor,idProduct);}else{char sendbuf[256]={0};sprintf(sendbuf,"%s,%s",action,level.c_str());sendtomqueue(sendbuf);}}udev_device_unref(dev);}usleep(500000); // Sleep for 0.5 seconds}udev_monitor_unref(mon);udev_unref(udev);
}int Usbaction::Init() {//logif (0== BInit(APP_TYPE_LOG, ACTIVITYCENSUS_LOG, 0)) {return 0;}return 1;
}int Usbaction::Run() {//初始化if (NS_FAILED == Init()) {return 0;}monitorDevices();return 1;
}int main(int argc, char **argv) {Usbaction action;action.Run();return 0;}
makefile
CC = g++
CFLAGS =
DEBUGFLAG = -g -Wall
MACRO =
#MACRO = -D_DEBUG
LIBDIRS =
LIBS = -ldl -ludev -lrt
INCLUDE =
MAKE_SO =
OPTIONS =
OBJDIR =
SRCDIR =
RUNOUTPUT = usbaction
LIBOUTPUT =
OBJS = usbaction.odefault:$(RUNOUTPUT)clean:rm -f $(OBJS) $(RUNOUTPUT)install:cp -f $(RUNOUTPUT) ../../bin$(RUNOUTPUT):$(OBJS)$(CC) -o $(RUNOUTPUT) $^ $(OPTIONS) $(LIBDIRS) $(LIBS).cpp.o:$(CC) $(DEBUGFLAG) $(MACRO) -fPIC -c $< -o $@ $(CFLAGS) $(INCLUDE)
讀取mqueue
普通應用權限的應用可以讀取root用戶權限的mqueue文件,下面是非阻塞式讀取隊列數據。
#include <iostream>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <cstring>
#include <unistd.h>struct Message {long mtype; // 消息類型char mtext[256]; // 消息內容
};const char* QUEUE_NAME = "/ymore_msg"; // 消息隊列的名稱int main() {// 打開消息隊列以讀取模式mqd_t mq = mq_open(QUEUE_NAME, O_RDONLY | O_NONBLOCK);if (mq == (mqd_t)-1) {std::cerr << "Error opening message queue: " << strerror(errno) << std::endl;return 1;}Message msg;ssize_t bytes_read;// 循環接收消息while (true) {bytes_read = mq_receive(mq, reinterpret_cast<char*>(&msg), sizeof(msg), NULL);if (bytes_read == -1) {std::cerr << "Error receiving message: " << strerror(errno) << std::endl;break;}std::cout << "Received message: " << msg.mtext << std::endl;// 如果讀取到隊列為空,可以根據條件退出// 例如,使用 `mq_getattr()` 獲取隊列的當前狀態struct mq_attr attr;if (mq_getattr(mq, &attr) == -1) {std::cerr << "Error getting queue attributes: " << strerror(errno) << std::endl;break;}// 如果隊列中沒有消息,退出循環if (attr.mq_curmsgs == 0) {std::cout << "No more messages in the queue." << std::endl;break;}}// 關閉消息隊列mq_close(mq);return 0;
}
結尾
? ? ? ? Linux后臺C/C++項目,一般在架構設計時,可以設計共享內容來內部處理緩存數據,但也有考慮到第三方應用或者擴展型應用的場景,此時mqueue是比較合適了,如果是高并發的隊列緩存,還是得找成熟的隊列緩存中間件,比如kafka。