主页 > 其他  > 

【LinuxC/C++开发】Linux系统轻量级的队列缓存mqueue

【LinuxC/C++开发】Linux系统轻量级的队列缓存mqueue
前言

        开发设计时,通常会对业务流程进行模块化,有些流程之间,不要求同步,但又需要传递信息时,如果存储到数据库,效率降低很多,如果是存放在内存是最好的。此时可以选择系统的IPC(进程间通信,如共享内存等),本文讲解的是适合轻量级的队列缓存场景的mqueue。

功能讲解 mqueue特性

        Linux的mqueue(消息队列)是POSIX标准中定义的进程间通信(IPC)机制,允许不同进程通过内核维护的队列传递结构化消息。其具备以下几个特性:

存储在指定文件:mqueue消息队列文件默认挂载在/dev/mqueue目录下。通过mq_open创建的消息队列会在此目录生成对应文件节点,内核使用红黑树管理消息的存储与优先级持久性:POSIX消息队列随系统重启消失可在命令行查看队列信息:cat /dev/mqueue/[队列名]  # 查看队列属性(如最大消息数、消息大小) 功能介绍

需求场景:某些功能需要在root用户下作为服务执行,组装的生产数据需要推送给登录系统桌面的普通用户权限的应用。

下面以在root权限下运行的读取usb信息的服务,监测USB的插拔事件并把信息推送到mqueue,而普通用户的应用通过读取mqueue获取USB插拔信息为例。

获取事件信息写入mqueue #ifndef USBACTION_H_ #define USBACTION_H_ #include <unistd.h> #include <cstring> #include <iostream> #include <sstream> #include <string> #include <algorithm> #include <vector> #include <cstdio> #include <stdio.h> #include <dlfcn.h> #include <string.h> #include <stdlib.h> #include <unistd.h> #include <libudev.h> #include <mqueue.h> using namespace std; #define MAX_NUM 100 struct Message { long mtype; // 消息类型 char mtext[256]; // 消息内容 }; const int MSG_TYPE = 1; const char* QUEUE_NAME = "/usb_msg";//在/dev/mqueue目录下 class Usbaction: public TUtilBase { public: Usbaction(); ~Usbaction(); int Run(); private: // int Init();//初始化 void monitorDevices(); int sendtomqueue(const char* mqstr); unsigned int uSleepTime; //刷新间隔 }; #endif /* USBACTION_H_ */ #include "usbaction.h" std::vector<std::string> split(const std::string& str, char delimiter) { std::vector<std::string> tokens; std::istringstream tokenStream(str); std::string token; while (std::getline(tokenStream, token, delimiter)) { tokens.push_back(token); } return tokens; } Usbaction::Usbaction() { uSleepTime = 5; } Usbaction::~Usbaction() { } int Usbaction::sendtomqueue(const char* mqstr){ mqd_t mq; struct mq_attr attr; attr.mq_flags = O_NONBLOCK; // 设置为非阻塞模式 attr.mq_maxmsg = MAX_NUM; // 队列中最大消息数 attr.mq_msgsize = sizeof(Message);// 消息的最大大小(字节) attr.mq_curmsgs = 0; // 队列中当前消息数 mq = mq_open(QUEUE_NAME, O_CREAT | O_WRONLY, 0644, &attr); if (mq == (mqd_t)-1) { WRITELOG(LOG_ERROR,"mq_open Error"); return 0; } Message msg; msg.mtype = MSG_TYPE; strcpy(msg.mtext,mqstr); //attr.mq_flags |= O_NONBLOCK; if (mq_setattr(mq, &attr, NULL) == -1) { return 0; } if (mq_send(mq, reinterpret_cast<char*>(&msg), sizeof(msg), 0) == -1) { if (errno == EAGAIN) { } else { perror("mq_send"); } }else { std::cout << "Message sent: " << msg.mtext << std::endl; } // 关闭消息队列 mq_close(mq); return 1; } void Usbaction::monitorDevices() { // 创建 udev 对象 struct udev *udev = udev_new(); if (!udev) { return; } // 创建 udev 监视器 struct udev_monitor *mon = udev_monitor_new_from_netlink(udev, "udev"); if (!mon) { udev_unref(udev); return; } // 添加过滤器以匹配usb 子系统 udev_monitor_filter_add_match_subsystem_devtype(mon, "usb", nullptr); udev_monitor_enable_receiving(mon); std::cout << "Monitoring block and USB events. Press Ctrl+C to exit." << std::endl; while (true) { struct udev_device *dev = udev_monitor_receive_device(mon); if (dev) { bool bfind=false; std::string level=""; const char* subsystem = udev_device_get_subsystem(dev); const char* action = udev_device_get_action(dev); const char* devnode = udev_device_get_devnode(dev); const char *pro = udev_device_get_property_value(dev, "ID_USB_INTERFACES"); if (subsystem && action && devnode) { if (strcmp(action,"add")==0||strcmp(action,"remove")==0){ bfind=true; std::vector<std::string> tokens = split(devnode, '/'); // 确保有至少两个分割结果 if (tokens.size() >= 2) { string device = tokens[tokens.size() - 1]; string bus = tokens[tokens.size() - 2]; level = bus + ":" + device; } } } if (bfind) { const char* idVendor = udev_device_get_sysattr_value(dev,"idVendor"); const char* idProduct = udev_device_get_sysattr_value(dev, "idProduct"); if (idVendor && idProduct) {//strcmp(action,"add")==0 char sendbuf[256]={0}; if(pro){ sprintf(sendbuf,"%s,%s,%s:%s,%s",action,level.c_str(),idVendor,idProduct,pro); }else sprintf(sendbuf,"%s,%s,%s:%s",action,level.c_str(),idVendor,idProduct); sendtomqueue(sendbuf); %s",action,level.c_str(),idVendor,idProduct); }else{ char sendbuf[256]={0}; sprintf(sendbuf,"%s,%s",action,level.c_str()); sendtomqueue(sendbuf); } } udev_device_unref(dev); } usleep(500000); // Sleep for 0.5 seconds } udev_monitor_unref(mon); udev_unref(udev); } int Usbaction::Init() { //log if (0== BInit(APP_TYPE_LOG, ACTIVITYCENSUS_LOG, 0)) { return 0; } return 1; } int Usbaction::Run() { //初始化 if (NS_FAILED == Init()) { return 0; } monitorDevices(); return 1; } int main(int argc, char **argv) { Usbaction action; action.Run(); return 0; }

makefile

CC = g++ CFLAGS = DEBUGFLAG = -g -Wall MACRO = #MACRO = -D_DEBUG LIBDIRS = LIBS = -ldl -ludev -lrt INCLUDE = MAKE_SO = OPTIONS = OBJDIR = SRCDIR = RUNOUTPUT = usbaction LIBOUTPUT = OBJS = usbaction.o default:$(RUNOUTPUT) clean: rm -f $(OBJS) $(RUNOUTPUT) install: cp -f $(RUNOUTPUT) ../../bin $(RUNOUTPUT):$(OBJS) $(CC) -o $(RUNOUTPUT) $^ $(OPTIONS) $(LIBDIRS) $(LIBS) .cpp.o: $(CC) $(DEBUGFLAG) $(MACRO) -fPIC -c $< -o $@ $(CFLAGS) $(INCLUDE) 读取mqueue

普通应用权限的应用可以读取root用户权限的mqueue文件,下面是非阻塞式读取队列数据。

#include <iostream> #include <mqueue.h> #include <fcntl.h> #include <sys/stat.h> #include <cstring> #include <unistd.h> struct Message { long mtype; // 消息类型 char mtext[256]; // 消息内容 }; const char* QUEUE_NAME = "/ymore_msg"; // 消息队列的名称 int main() { // 打开消息队列以读取模式 mqd_t mq = mq_open(QUEUE_NAME, O_RDONLY | O_NONBLOCK); if (mq == (mqd_t)-1) { std::cerr << "Error opening message queue: " << strerror(errno) << std::endl; return 1; } Message msg; ssize_t bytes_read; // 循环接收消息 while (true) { bytes_read = mq_receive(mq, reinterpret_cast<char*>(&msg), sizeof(msg), NULL); if (bytes_read == -1) { std::cerr << "Error receiving message: " << strerror(errno) << std::endl; break; } std::cout << "Received message: " << msg.mtext << std::endl; // 如果读取到队列为空,可以根据条件退出 // 例如,使用 `mq_getattr()` 获取队列的当前状态 struct mq_attr attr; if (mq_getattr(mq, &attr) == -1) { std::cerr << "Error getting queue attributes: " << strerror(errno) << std::endl; break; } // 如果队列中没有消息,退出循环 if (attr.mq_curmsgs == 0) { std::cout << "No more messages in the queue." << std::endl; break; } } // 关闭消息队列 mq_close(mq); return 0; } 结尾

        Linux后台C/C++项目,一般在架构设计时,可以设计共享内容来内部处理缓存数据,但也有考虑到第三方应用或者扩展型应用的场景,此时mqueue是比较合适了,如果是高并发的队列缓存,还是得找成熟的队列缓存中间件,比如kafka。

标签:

【LinuxC/C++开发】Linux系统轻量级的队列缓存mqueue由讯客互联其他栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“【LinuxC/C++开发】Linux系统轻量级的队列缓存mqueue