日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

NNG 通信模式

發布時間:2024/1/1 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 NNG 通信模式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

NNG 是 nanomsg 的繼任版本,純c語言開發,工作模式分為幾種:

1,Pipeline (A One-Way Pipe)

單向通信,類似與生產者消費者模型的消息隊列,消息從推方流向拉方。

#include <stdlib.h> #include <stdio.h> #include <string.h> #include <unistd.h>#include <nng/nng.h> #include <nng/protocol/pipeline0/pull.h> #include <nng/protocol/pipeline0/push.h>#define NODE0 "node0" #define NODE1 "node1"void fatal(const char *func, int rv) {fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));exit(1); }int node0(const char *url) {nng_socket sock;int rv;if ((rv = nng_pull0_open(&sock)) != 0) {fatal("nng_pull0_open", rv);}if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {fatal("nng_listen", rv);}for (;;) {char *buf = NULL;size_t sz;if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {fatal("nng_recv", rv);}printf("NODE0: RECEIVED \"%s\"\n", buf); nng_free(buf, sz);} }int node1(const char *url, char *msg) {int sz_msg = strlen(msg) + 1; // '\0' toonng_socket sock;int rv;int bytes;if ((rv = nng_push0_open(&sock)) != 0) {fatal("nng_push0_open", rv);}if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {fatal("nng_dial", rv);}printf("NODE1: SENDING \"%s\"\n", msg);if ((rv = nng_send(sock, msg, strlen(msg)+1, 0)) != 0) {fatal("nng_send", rv);}sleep(1); // wait for messages to flush before shutting downnng_close(sock);return (0); }int main(int argc, char **argv) {if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))return (node0(argv[2]));if ((argc > 2) && (strcmp(NODE1, argv[1]) == 0))return (node1(argv[2], argv[3]));fprintf(stderr, "Usage: pipeline %s|%s <URL> <ARG> ...'\n",NODE0, NODE1);return (1); }//gcc pipeline.c -lnng -o pipeline

2,Request/Reply (I ask, you answer)

請求-應答模式,如果不應答,請求方會一直請求。

#include <stdlib.h> #include <stdio.h> #include <string.h> #include <time.h>#include <nng/nng.h> #include <nng/protocol/reqrep0/rep.h> #include <nng/protocol/reqrep0/req.h>#define NODE0 "node0" #define NODE1 "node1" #define DATE "DATE"void fatal(const char *func, int rv) {fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));exit(1); }char * date(void) {time_t now = time(&now);struct tm *info = localtime(&now);char *text = asctime(info);text[strlen(text)-1] = '\0'; // remove '\n'return (text); }int node0(const char *url) {nng_socket sock;int rv;if ((rv = nng_rep0_open(&sock)) != 0) {fatal("nng_rep0_open", rv);}if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {fatal("nng_listen", rv);}for (;;) {char *buf = NULL;size_t sz;if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {fatal("nng_recv", rv);}if ((sz == (strlen(DATE) + 1)) && (strcmp(DATE, buf) == 0)) {printf("NODE0: RECEIVED DATE REQUEST\n");char *d = date();printf("NODE0: SENDING DATE %s\n", d);if ((rv = nng_send(sock, d, strlen(d) + 1, 0)) != 0) {fatal("nng_send", rv);}}nng_free(buf, sz);} }int node1(const char *url) {nng_socket sock;int rv;size_t sz;char *buf = NULL;if ((rv = nng_req0_open(&sock)) != 0) {fatal("nng_socket", rv);}if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {fatal("nng_dial", rv);}printf("NODE1: SENDING DATE REQUEST %s\n", DATE);if ((rv = nng_send(sock, DATE, strlen(DATE)+1, 0)) != 0) {fatal("nng_send", rv);}if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {fatal("nng_recv", rv);}printf("NODE1: RECEIVED DATE %s\n", buf); nng_free(buf, sz);nng_close(sock);return (0); }int main(const int argc, const char **argv) {if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))return (node0(argv[2]));if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0))return (node1(argv[2]));fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", NODE0, NODE1);return (1); }

3,Pair (Two Way Radio)

一對一雙向通信,類似對講機

#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h>#include <nng/nng.h> #include <nng/protocol/pair0/pair.h>#define NODE0 "node0" #define NODE1 "node1"void fatal(const char *func, int rv) {fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));exit(1); }int send_name(nng_socket sock, char *name) {int rv;printf("%s: SENDING \"%s\"\n", name, name);if ((rv = nng_send(sock, name, strlen(name) + 1, 0)) != 0) {fatal("nng_send", rv);}return (rv); }int recv_name(nng_socket sock, char *name) {char *buf = NULL;int rv;size_t sz;if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {printf("%s: RECEIVED \"%s\"\n", name, buf); nng_free(buf, sz);}return (rv); }int send_recv(nng_socket sock, char *name) {int rv;if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 100)) != 0) {fatal("nng_setopt_ms", rv);}for (;;) {recv_name(sock, name);sleep(1);send_name(sock, name);} }int node0(const char *url) {nng_socket sock;int rv;if ((rv = nng_pair0_open(&sock)) != 0) {fatal("nng_pair0_open", rv);}if ((rv = nng_listen(sock, url, NULL, 0)) !=0) {fatal("nng_listen", rv);}return (send_recv(sock, NODE0)); }int node1(const char *url) {nng_socket sock;int rv;sleep(1);if ((rv = nng_pair0_open(&sock)) != 0) {fatal("nng_pair0_open", rv);}if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {fatal("nng_dial", rv);}return (send_recv(sock, NODE1)); }int main(int argc, char **argv) {if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))return (node0(argv[2]));if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0))return (node1(argv[2]));fprintf(stderr, "Usage: pair %s|%s <URL> <ARG> ...\n", NODE0, NODE1);return 1; }

4,Pub/Sub (Topics & Broadcast)?

單向廣播

#include <stdlib.h> #include <stdio.h> #include <string.h> #include <time.h> #include <unistd.h>#include <nng/nng.h> #include <nng/protocol/pubsub0/pub.h> #include <nng/protocol/pubsub0/sub.h>#define SERVER "server" #define CLIENT "client"void fatal(const char *func, int rv) {fprintf(stderr, "%s: %s\n", func, nng_strerror(rv)); }char * date(void) {time_t now = time(&now);struct tm *info = localtime(&now);char *text = asctime(info);text[strlen(text)-1] = '\0'; // remove '\n'return (text); }int server(const char *url) {nng_socket sock;int rv;if ((rv = nng_pub0_open(&sock)) != 0) {fatal("nng_pub0_open", rv);}if ((rv = nng_listen(sock, url, NULL, 0)) < 0) {fatal("nng_listen", rv);}for (;;) {char *d = date();printf("SERVER: PUBLISHING DATE %s\n", d);if ((rv = nng_send(sock, d, strlen(d) + 1, 0)) != 0) {fatal("nng_send", rv);}sleep(1);} }int client(const char *url, const char *name) {nng_socket sock;int rv;if ((rv = nng_sub0_open(&sock)) != 0) {fatal("nng_sub0_open", rv);}// subscribe to everything (empty means all topics)if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {fatal("nng_setopt", rv);}if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {fatal("nng_dial", rv);}for (;;) {char *buf = NULL;size_t sz;if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {fatal("nng_recv", rv);}printf("CLIENT (%s): RECEIVED %s\n", name, buf); nng_free(buf, sz);} }int main(const int argc, const char **argv) {if ((argc >= 2) && (strcmp(SERVER, argv[1]) == 0))return (server(argv[2]));if ((argc >= 3) && (strcmp(CLIENT, argv[1]) == 0))return (client (argv[2], argv[3]));fprintf(stderr, "Usage: pubsub %s|%s <URL> <ARG> ...\n",SERVER, CLIENT);return 1; }

5,Survey (Everybody Votes)?

?用于多節點表決或者服務發現

?

#include <stdio.h> #include <stdlib.h> #include <string.h> #include <time.h> #include <unistd.h>#include <nng/nng.h> #include <nng/protocol/survey0/survey.h> #include <nng/protocol/survey0/respond.h>#define SERVER "server" #define CLIENT "client" #define DATE "DATE"void fatal(const char *func, int rv) {fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));exit(1); }char * date(void) {time_t now = time(&now);struct tm *info = localtime(&now);char *text = asctime(info);text[strlen(text)-1] = '\0'; // remove '\n'return (text); }int server(const char *url) {nng_socket sock;int rv;if ((rv = nng_surveyor0_open(&sock)) != 0) {fatal("nng_surveyor0_open", rv);}if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {fatal("nng_listen", rv);}for (;;) {printf("SERVER: SENDING DATE SURVEY REQUEST\n");if ((rv = nng_send(sock, DATE, strlen(DATE) + 1, 0)) != 0) {fatal("nng_send", rv);}for (;;) {char *buf = NULL;size_t sz;rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC);if (rv == NNG_ETIMEDOUT) {break;}if (rv != 0) {fatal("nng_recv", rv);}printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n",buf); nng_free(buf, sz);}printf("SERVER: SURVEY COMPLETE\n");} }int client(const char *url, const char *name) {nng_socket sock;int rv;if ((rv = nng_respondent0_open(&sock)) != 0) {fatal("nng_respondent0_open", rv);}if ((rv = nng_dial(sock, url, NULL, NNG_FLAG_NONBLOCK)) != 0) {fatal("nng_dial", rv);}for (;;) {char *buf = NULL;size_t sz;if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {printf("CLIENT (%s): RECEIVED \"%s\" SURVEY REQUEST\n",name, buf); nng_free(buf, sz);char *d = date();printf("CLIENT (%s): SENDING DATE SURVEY RESPONSE\n",name);if ((rv = nng_send(sock, d, strlen(d) + 1, 0)) != 0) {fatal("nng_send", rv);}}} }int main(const int argc, const char **argv) {if ((argc >= 2) && (strcmp(SERVER, argv[1]) == 0))return (server(argv[2]));if ((argc >= 3) && (strcmp(CLIENT, argv[1]) == 0))return (client(argv[2], argv[3]));fprintf(stderr, "Usage: survey %s|%s <URL> <ARG> ...\n",SERVER, CLIENT);return 1; }

6,?Bus (Routing)

網狀連接通信,每個加入節點都可以發送/接受廣播消息

#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h>#include <nng/nng.h> #include <nng/protocol/bus0/bus.h>void fatal(const char *func, int rv) {fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));exit(1); }int node(int argc, char **argv) {nng_socket sock;int rv;size_t sz;if ((rv = nng_bus0_open(&sock)) != 0) {fatal("nng_bus0_open", rv);}if ((rv = nng_listen(sock, argv[2], NULL, 0)) != 0) {fatal("nng_listen", rv);}sleep(1); // wait for peers to bindif (argc >= 3) {for (int x = 3; x < argc; x++) {if ((rv = nng_dial(sock, argv[x], NULL, 0)) != 0) {fatal("nng_dial", rv);}}}sleep(1); // wait for connects to establish// SENDsz = strlen(argv[1]) + 1; // '\0' tooprintf("%s: SENDING '%s' ONTO BUS\n", argv[1], argv[1]);if ((rv = nng_send(sock, argv[1], sz, 0)) != 0) {fatal("nng_send", rv);}// RECVfor (;;) {char *buf = NULL;size_t sz;if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) !=0) {if (rv == NNG_ETIMEDOUT) {fatal("nng_recv", rv);}}printf("%s: RECEIVED '%s' FROM BUS\n", argv[1], buf); nng_free(buf, sz);}nng_close(sock);return (0); }int main(int argc, char **argv) {if (argc >= 3) {return (node(argc, argv));}fprintf(stderr, "Usage: bus <NODE_NAME> <URL> <URL> ...\n");return 1; }

?

總結

以上是生活随笔為你收集整理的NNG 通信模式的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。