Linux IPC实践(7) --Posix消息队列
1.?創建/獲取一個消息隊列
#include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <mqueue.h> mqd_t mq_open(const char *name, int oflag); //專用于打開一個消息隊列 mqd_t mq_open(const char *name, int oflag, mode_t mode,struct mq_attr *attr);參數:
? ?name:? 消息隊列名字;
? ?oflag: 與open函數類型,?可以是O_RDONLY,?O_WRONLY,?O_RDWR,?還可以按位或上O_CREAT,?O_EXCL,?O_NONBLOCK.
? ?mode: 如果oflag指定了O_CREAT,?需要指定mode參數;
? ?attr: 指定消息隊列的屬性;
返回值:
? ?成功:?返回消息隊列文件描述符;
? ?失敗:?返回-1;
注意-Posix?IPC名字限制:
? ?1.?必須以”/”開頭,?并且后面不能還有”/”,?形如:/file-name;
? ?2.?名字長度不能超過NAME_MAX
? ?3.?鏈接時:Link?with?-lrt.
/**?System?V?消息隊列
通過msgget來創建/打開消息隊列
int?msgget(key_t?key,?int?msgflg);
**/
?
2.?關閉一個消息隊列
int mq_close(mqd_t mqdes); /** System V 消息隊列沒有類似的該函數調用**/3.?刪除一個消息隊列
int mq_unlink(const char *name); /** System V 消息隊列 通過msgctl函數, 并將cmd指定為IPC_RMID來實現 int msgctl(int msqid, int cmd, struct msqid_ds *buf); **/ //示例 int main() {mqd_t mqid = mq_open("/abc", O_CREAT|O_RDONLY, 0666, NULL);if (mqid == -1)err_exit("mq_open error");cout << "mq_open success" << endl;mq_close(mqid);mq_unlink("/abc");cout << "unlink success" << endl; }4.?獲取/設置消息隊列屬性
int mq_getattr(mqd_t mqdes, struct mq_attr *attr); int mq_setattr(mqd_t mqdes, struct mq_attr *newattr,struct mq_attr *oldattr);參數:
? ?newattr:?需要設置的屬性
? ?oldattr:?原來的屬性
//struct mq_attr結構體說明 struct mq_attr {long mq_flags; /* Flags: 0 or O_NONBLOCK */long mq_maxmsg; /* Max. # of messages on queue: 消息隊列能夠保存的消息數 */long mq_msgsize; /* Max. message size (bytes): 消息的最大長度 */long mq_curmsgs; /* # of messages currently in queue: 消息隊列當前保存的消息數 */ };/**?System?V?消息隊列
通過msgctl函數,?并將cmd指定為IPC_STAT/IPC_SET來實現
int?msgctl(int?msqid,?int?cmd,?struct?msqid_ds?*buf);
**/
/** 示例: 獲取消息隊列的屬性 **/ int main(int argc,char **argv) {mqd_t mqid = mq_open("/test", O_RDONLY|O_CREAT, 0666, NULL);if (mqid == -1)err_exit("mq_open error");struct mq_attr attr;if (mq_getattr(mqid, &attr) == -1)err_exit("mq_getattr error");cout << "Max messages on queue: " << attr.mq_maxmsg << endl;cout << "Max message size: " << attr.mq_msgsize << endl;cout << "current messages: " << attr.mq_curmsgs << endl;mq_close(mqid);return 0; }5.?發送消息
int mq_send(mqd_t mqdes, const char *msg_ptr,size_t msg_len, unsigned msg_prio);參數:
? ?msg_ptr:?指向需要發送的消息的指針
? ?msg_len:?消息長度
? ?msg_prio:?消息的優先級
/**?System?V?消息隊列
通過msgsnd函數來實現消息發送
int?msgsnd(int?msqid,?const?void?*msgp,?size_t?msgsz,?int?msgflg);
**/
/** 示例: 向消息隊列中發送消息, prio需要從命令行參數中讀取 **/ struct Student {char name[36];int age; }; int main(int argc,char **argv) {if (argc != 2)err_quit("./send <prio>");mqd_t mqid = mq_open("/test", O_WRONLY|O_CREAT, 0666, NULL);if (mqid == -1)err_exit("mq_open error");struct Student stu = {"xiaofang", 23};unsigned prio = atoi(argv[1]);if (mq_send(mqid, (const char *)&stu, sizeof(stu), prio) == -1)err_exit("mq_send error");mq_close(mqid);return 0; }6.?從消息隊列中讀取消息
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr,size_t msg_len, unsigned *msg_prio);參數:
? msg_len:?讀取的消息的長度,?注意:?此值一定要等于mq_attr::mq_msgsize的值,?該值可以通過mq_getattr獲取,?但一般是8192字節 ? ??[this?must?be?greater?than?the?mq_msgsize?attribute?of?the?queue?(see?mq_getattr(3)).]
? msg_prio:?保存獲取的消息的優先級
返回值:
? 成功:?返回讀取的消息的字節數
? 失敗:?返回-1
? 注意:?讀取的永遠是消息隊列中優先級最高的最早的消息,?如果消息隊列為,?如果不指定為非阻塞模式,?則mq_receive會阻塞;
/**?System?V?消息隊列
通過msgrcv函數來實現消息發送的
ssize_t?msgrcv(int?msqid,?void?*msgp,?size_t?msgsz,?long?msgtyp,?int?msgflg);
**/
/** 示例: 從消息隊列中獲取消息 **/ int main(int argc,char **argv) {mqd_t mqid = mq_open("/test", O_RDONLY);if (mqid == -1)err_exit("mq_open error");struct Student buf;int nrcv;unsigned prio;struct mq_attr attr;if (mq_getattr(mqid, &attr) == -1)err_exit("mq_getattr error");if ((nrcv = mq_receive(mqid, (char *)&buf, attr.mq_msgsize, &prio)) == -1)err_exit("mq_receive error");cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: "<< buf.name << ", age: " << buf.age << endl;mq_close(mqid);return 0; }7.?建立/刪除消息到達通知事件
int mq_notify(mqd_t mqdes, const struct sigevent *sevp);參數sevp:
? ?NULL:?表示撤銷已注冊通知;
? ?非空:?表示當消息到達且消息隊列當前為空,?那么將得到通知;
通知方式:
? ?1.?產生一個信號,?需要自己綁定
? ?2.?創建一個線程,?執行指定的函數
注意:?這種注冊的方式只是在消息隊列從空到非空時才產生消息通知事件,?而且這種注冊方式是一次性的!
//sigevent結構體 struct sigevent {int sigev_notify; /* Notification method */int sigev_signo; /* Notification signal */union sigval sigev_value; /* Data passed with notification */void (*sigev_notify_function) (union sigval); /* Function used for thread notification (SIGEV_THREAD) */void *sigev_notify_attributes; /* Attributes for notification thread (SIGEV_THREAD) */pid_t sigev_notify_thread_id; /* ID of thread to signal (SIGEV_THREAD_ID) */ }; union sigval /* Data passed with notification */ {int sival_int; /* Integer value */void *sival_ptr; /* Pointer value */ };sigev_notify代表通知的方式:?一般常用兩種取值:SIGEV_SIGNAL,?以信號方式通知;?SIGEV_THREAD,?以線程方式通知
如果以信號方式通知:?則需要設定一下兩個參數:
? ?sigev_signo:?信號的代碼
? ?sigev_value:?信號的附加數據(實時信號)
如果以線程方式通知:?則需要設定以下兩個參數:
? ?sigev_notify_function
? ?sigev_notify_attributes
/**?Posix?IPC所特有的功能,?System?V沒有?**/
/**示例: 將下面程序多運行幾遍, 尤其是當消息隊列”從空->非空”, 多次”從空->非空”, 當消息隊列不空時運行該程序時, 觀察該程序的狀態; **/ mqd_t mqid; long size; void sigHandlerForUSR1(int signo) {//將數據的讀取轉移到對信號SIGUSR1的響應函數中來struct Student buf;int nrcv;unsigned prio;if ((nrcv = mq_receive(mqid, (char *)&buf, size, &prio)) == -1)err_exit("mq_receive error");cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: "<< buf.name << ", age: " << buf.age << endl; }int main(int argc,char **argv) {// 安裝信號響應函數if (signal(SIGUSR1, sigHandlerForUSR1) == SIG_ERR)err_exit("signal error");mqid = mq_open("/test", O_RDONLY);if (mqid == -1)err_exit("mq_open error");// 獲取消息的最大長度struct mq_attr attr;if (mq_getattr(mqid, &attr) == -1)err_exit("mq_getattr error");size = attr.mq_msgsize;// 注冊消息到達通知事件struct sigevent event;event.sigev_notify = SIGEV_SIGNAL; //指定以信號方式通知event.sigev_signo = SIGUSR1; //指定以SIGUSR1通知if (mq_notify(mqid, &event) == -1)err_exit("mq_notify error");//死循環, 等待信號到來while (true)pause();mq_close(mqid);return 0; } /** 示例:多次注冊notify, 這樣就能過多次接收消息, 但是還是不能從隊列非空的時候進行接收, 將程序改造如下: **/ mqd_t mqid; long size; struct sigevent event; void sigHandlerForUSR1(int signo) {// 注意: 是在消息被讀走之前進行注冊,// 不然該程序就感應不到消息隊列"從空->非空"的一個過程變化了if (mq_notify(mqid, &event) == -1)err_exit("mq_notify error");//將數據的讀取轉移到對信號SIGUSR1的響應函數中來struct Student buf;int nrcv;unsigned prio;if ((nrcv = mq_receive(mqid, (char *)&buf, size, &prio)) == -1)err_exit("mq_receive error");cout << "receive " << nrcv << " bytes, priority: " << prio << ", name: "<< buf.name << ", age: " << buf.age << endl; }int main(int argc,char **argv) {// 安裝信號響應函數if (signal(SIGUSR1, sigHandlerForUSR1) == SIG_ERR)err_exit("signal error");mqid = mq_open("/test", O_RDONLY);if (mqid == -1)err_exit("mq_open error");// 獲取消息的最大長度struct mq_attr attr;if (mq_getattr(mqid, &attr) == -1)err_exit("mq_getattr error");size = attr.mq_msgsize;// 注冊消息到達通知事件event.sigev_notify = SIGEV_SIGNAL; //指定以信號方式通知event.sigev_signo = SIGUSR1; //指定以SIGUSR1通知if (mq_notify(mqid, &event) == -1)err_exit("mq_notify error");//死循環, 等待信號到來while (true)pause();mq_close(mqid);return 0; }mq_notify?注意點總結:
? ?1.?任何時刻只能有一個進程可以被注冊為接收某個給定隊列的通知;
? ?2.?當有一個消息到達某個先前為空的隊列,?而且已有一個進程被注冊為接收該隊列的通知時,?只有沒有任何線程阻塞在該隊列的mq_receive調用的前提下,?通知才會發出;
? ?3.?當通知被發送給它的注冊進程時,?該進程的注冊被撤銷.?進程必須再次調用mq_notify以重新注冊(如果需要的話),但是要注意:?重新注冊要放在從消息隊列讀出消息之前而不是之后(如同示例程序);
?
附-查看已經成功創建的Posix消息隊列
#其存在與一個虛擬文件系統中,?需要將其掛載到系統中才能查看
? Mounting?the?message?queue?filesystem?On?Linux,?message?queues?are?created?in?a?virtual?filesystem. ?
(Other?implementations?may?also??provide?such?a?feature,?but?the?details?are?likely?to?differ.)??This?
file?system?can?be?mounted?(by?the?superuser,?注意是使用root用戶才能成功)?using?the?following?commands:
mkdir?/dev/mqueue
mount?-t?mqueue?none?/dev/mqueue
還可以使用cat查看該消息隊列的狀態,?rm刪除:
cat?/dev/mqueue/abc?
rm?abc
還可umount該文件系統
umount?/dev/mqueue
?
附-Makefile
.PHONY: clean all CC = g++ CPPFLAGS = -Wall -g BIN = main SOURCES = $(BIN.=.cpp) all: $(BIN)%.o: %.c$(CC) $(CPPFLAGS) -c $^ -o $@ main: main.o$(CC) $(CPPFLAGS) $^ -lrt -o $@clean:-rm -rf $(BIN) *.o bin/ obj/ core總結
以上是生活随笔為你收集整理的Linux IPC实践(7) --Posix消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: android aidl调用进程间服务的
- 下一篇: Linux信号实践(1) --Linux