日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

NNG pair 异步通信

發(fā)布時間:2024/1/1 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 NNG pair 异步通信 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一,利用NNG pair模式,實現異步通信。

二,manager端? 綁定地址,回調函數里 接收 異步消息:

#include <stdint.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <time.h>#include <nng/nng.h> #include <nng/protocol/pair0/pair.h> #include <nng/supplemental/util/platform.h>#include <iostream> #include <thread> #include <chrono> #include <atomic> #include <signal.h> #include <sys/wait.h>using namespace std; using namespace std::chrono;static bool exit_flag = false;void recv_data_callback(void *arg); static void sig_handler(int sig) {exit_flag = true;std::cout << "sig_handler " << exit_flag << endl; }void fatal(const char *func, int rv) {fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));exit(1); }class Manager { public://初始化bool init(){//創(chuàng)建io 并綁定回調函數rv = nng_aio_alloc(&aio, recv_data_callback, this);if (rv < 0){fatal("cannot allocate aio", rv);}//打開rv = nng_pair0_open(&sock);if (rv != 0){fatal("nng_pair0_open", rv);}//設置緩沖區(qū)大小nng_socket_set_int(sock, NNG_OPT_SENDBUF, 2048);nng_socket_set_int(sock, NNG_OPT_RECVBUF, 2048);//開始監(jiān)聽if ((rv = nng_listen(sock, url.c_str(), NULL, 0)) != 0){fatal("nng_listen", rv);}nng_recv_aio(sock, aio);isInit = true;return isInit;}//發(fā)送數據void send(const std::string &msgStr){if (!isInit)return;if (!isInit)return;nng_msg *msg = NULL;nng_msg_alloc(&msg, sizeof(msgStr));memcpy(nng_msg_body(msg), msgStr.c_str(), sizeof(msgStr));nng_sendmsg(sock, msg, 0);}public:nng_socket sock;nng_aio *aio{nullptr};private:int rv;std::string url{"ipc:///tmp/pair"};bool isInit{false}; };void recv_data_callback(void *arg) {int rv = 0;Manager *manager = static_cast<Manager*>(arg);nng_msg *msg = NULL;size_t json_len = 0;char * json_str = NULL;rv = nng_aio_result(manager->aio);if (0 != rv) {fatal("nng_recv error ", rv);}msg = nng_aio_get_msg(manager->aio);json_str = static_cast<char*>(nng_msg_body(msg));json_len = nng_msg_len(msg);std::cout<<"recv_data_callback "<<json_str<<std::endl;nng_msg_free(msg);nng_recv_aio(manager->sock, manager->aio); }int main(int argc, char *grgv[]) {signal(SIGINT, sig_handler);signal(SIGTERM, sig_handler);signal(SIGABRT, sig_handler);Manager manager;if (manager.init()){cout << "init success" << endl;}else{cout << "init failed" << endl;}while (!exit_flag){manager.send("Not bad");this_thread::sleep_for(seconds(1));}return 0; }

三,adapter 端,同步發(fā)送數據,單開一個線程?進行數據的輪詢接收。

#include <nng/nng.h> #include <nng/protocol/pair0/pair.h> #include <nng/supplemental/util/platform.h>#include <iostream> #include <thread> #include <chrono> #include <atomic> #include <signal.h> #include <sys/wait.h> #include <string.h>using namespace std; using namespace std::chrono;static bool exit_flag = false;static void sig_handler(int sig) {exit_flag = true;std::cout << "sig_handler " << exit_flag << endl; }void fatal(const char *func, int rv) {fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));exit(1); }void recv_data_callback(void *arg) { }class Adapter { public://初始化bool init(){//打開rv = nng_pair0_open(&sock);if (rv != 0){fatal("nng_pair0_open", rv);}//設置緩沖區(qū)大小nng_socket_set_int(sock, NNG_OPT_SENDBUF, 2048);nng_socket_set_int(sock, NNG_OPT_RECVBUF, 2048);rv = nng_dial(sock, url.c_str(), &dialer, 0);if (rv != 0){fatal("nng_dial", rv);}isInit = true;return isInit;}//開始接收void start(){if (!isInit)return;std::thread t([&](){while (!isStop){nng_msg * msg = NULL;char * json_str = NULL;nng_recvmsg(sock, &msg, 0);json_str = static_cast<char*>(nng_msg_body(msg));std::cout<<"nng_recvmsg "<<json_str<<std::endl;} });t.detach();}void stop(){isStop = true;cout << "stop " << isStop << endl;}void send(const std::string &msgStr){if (!isInit)return;nng_msg *msg = NULL;nng_msg_alloc(&msg, sizeof(msgStr));memcpy(nng_msg_body(msg), msgStr.c_str(), sizeof(msgStr));nng_sendmsg(sock, msg, 0);}public:nng_socket sock;nng_dialer dialer;std::atomic<bool> isStop{false};private:std::string url{"ipc:///tmp/pair"};int rv;bool isInit{false}; };int main(int argc, char *grgv[]) {signal(SIGINT, sig_handler);signal(SIGTERM, sig_handler);signal(SIGABRT, sig_handler);Adapter adapter;if (adapter.init()){cout << "init success" << endl;}else{cout << "init failed" << endl;}adapter.start();while (!exit_flag){adapter.send("How are you?");this_thread::sleep_for(seconds(1));}adapter.stop();return 0; }

?

3,CMakeLists.txt? 兩端 基本一致

cmake_minimum_required (VERSION 2.8.12) project(adapter) set(TARGET_NAME adapter)find_package(nng CONFIG REQUIRED)find_package(Threads)add_executable(${TARGET_NAME} adapter.cpp) target_link_libraries(${TARGET_NAME} nng::nng) target_compile_definitions(${TARGET_NAME} PRIVATE NNG_ELIDE_DEPRECATED)

總結

以上是生活随笔為你收集整理的NNG pair 异步通信的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。