日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

nng源码阅读

發布時間:2024/1/1 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 nng源码阅读 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Core

thread & mutex & cv

nni_mtx

互斥鎖, 提供nni_mtx_init,?nni_mtx_fini,?nni_mtx_lock,?nni_mtx_unlock接口.

nni_cv

條件變量, 提供nni_cv_init,?nni_cv_fini,?nni_cv_wake,?nni_cv_wake1,?nni_cv_until接口.

POSIX下就是pthread的條件變量.

nni_thr

int nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg);

nni_thr_run,?nni_thr_fini,?nni_thr_wait,?nni_thr_is_self

線程有init, start, stop, done4個狀態, 執行nni_thr_init后, 進入init狀態. 執行nni_thr_run進入start狀態. 執行nni_thr_wait后, 首先設置為stop狀態, 然后等待線程變成done狀態.

nni_thr并不直接執行nni_thr_func函數, 而是執行nni_thr_wrap函數, 該函數內部再調用nni_thr_init提供的回調函數.

nni_thr_wrap

  • 如果沒有start并且stop, 那么等待一個線程關聯的條件變量.
  • 如果nni_thr_run設置了start, 那么執行nni_thr_init提供的回調函數.
  • 回調函數執行完成設置為done狀態, 然后wake線程的條件變量.
  • taskq

    taskq為任務隊列, 內部包含一個鏈表, 鏈表中為需要執行的task, 每個taskq里面會有多個線程來處理鏈表里面的task.

    struct nni_taskq {nni_list tq_tasks; // 任務列表, 雙向鏈表實現nni_mtx tq_mtx; // 鎖nni_cv tq_sched_cv; // 條件變量nni_cv tq_wait_cv; // 條件變量nni_taskq_thr *tq_threads; // 線程數組int tq_nthreads; // 線程數bool tq_run; // true表示線程開始執行. };struct nni_task {nni_list_node task_node; // tq_tasks的節點void * task_arg; // 任務的參數nni_cb task_cb; // 該任務的回調函數nni_taskq * task_tq; // 任務所屬taskqnni_thr * task_thr; // 執行任務的線程. non-NULL if the task is runningunsigned task_busy;bool task_prep;bool task_reap; // task執行完后是否需要銷毀.nni_mtx task_mtx; // task的鎖.nni_cv task_cv; };

    nni_taskq_init(nni_taskq **tqp, int nthr)

    創建一個新的taskq, nthr指定taskq關聯的線程數, 進行nni_taskq的初始化. 啟動全部關聯的線程, 線程的函數為nni_taskq_thread.

    nni_taskq_thread

  • 加鎖tq_mtx
  • for的死循環
  • 從taskq的隊列里面取出一個任務
  • 解鎖tq_mtx
  • 執行任務的task_cb.
  • 執行完成, task->task_busy--
  • 如果task_busy為0, 那么喚醒等待task的條件變量
  • 如果需要銷毀task, 那么執行nni_task_fini
  • 如果tq_run為false, 退出循環
  • 等待tq_sched_cv條件變量
  • 解鎖tq_mtx
  • nni_taskq_sys_init

    系統的taskq, 相當于全局的taskq, 其中線程數最少為2, 默認為CPU數乘2, 如果編譯設置NNG_NUM_TASKQ_THREADS宏, 那么線程數就是該宏的值.

    nni_task_init

    創建一個nni_task. 默認情況: task_prep為false, task_reap為false, task_busy為0.

    nni_task_prep

    task_busy ++, task_prep = true

    nni_task_wait

    如果task_busy不為0, 那么等待task_cv條件變量

    nni_task_dispatch

    將task加入到taskq的任務鏈表, 然后執行nni_cv_wake1喚醒tq_sched_cv條件變量, 相當于是異步執行任務.

    nni_task_exec

    執行task_cb回調函數. 相當于是同步執行任務.

    aio

    struct nng_aio {int a_result; // Result code (nng_errno)size_t a_count; // Bytes transferred (I/O only)nni_time a_expire; // Absolute timeoutnni_duration a_timeout; // Relative timeout// These fields are private to the aio framework.bool a_stop; // shutting down (no new operations)bool a_sleep; // sleeping with no actionint a_sleeprv; // result when sleep wakesnni_task *a_task;// Read/write operations.nni_iov *a_iov;unsigned a_niov;nni_iov a_iovinl[4]; // inline IOVs - when the IOV list is shortnni_iov *a_iovalloc; // dynamically allocated IOVsunsigned a_niovalloc; // number of allocated IOVs// Message operations.nni_msg *a_msg;// User scratch data. Consumers may store values here, which// must be preserved by providers and the framework.void *a_user_data[4];// Operation inputs & outputs. Up to 4 inputs and 4 outputs may be// specified. The semantics of these will vary, and depend on the// specific operation.void *a_inputs[4];void *a_outputs[4];// Provider-use fields.nni_aio_cancelfn a_cancel_fn;void * a_cancel_arg;void * a_prov_data;nni_list_node a_prov_node;void * a_prov_extra[4]; // Extra data used by provider// Socket address. This turns out to be very useful, as we wind up// needing socket addresses for numerous connection related routines.// It would be cleaner to not have this and avoid burning the space,// but having this hear dramatically simplifies lots of code.nng_sockaddr a_sockaddr;// Expire node.nni_list_node a_expire_node; };

    nni_aio_init

    主要是創建a_task. 這個task關聯一個回調函數.

    a_expire = NNI_TIME_NEVER a_timeout = NNG_DURATION_INFINITE a_iov = a_iovinl a_niovalloc = 0

    void a_user_data[4];

    可以通過nni_aio_set_data和nni_aio_get_data設置和獲取數據,

    a_inputs, a_outputs

    通過nni_aio_set_input,?nni_aio_get_input,?nni_aio_set_output,?nni_aio_get_output設置和獲取

    nni_aio_begin

    進行一些初始化

    • a_result = 0;
    • a_count = 0;
    • a_cancel_fn = 0;
    • a_outputs = NULL
    • nni_task_prep(aio->a_task)

    nni_aio_finish

    int nni_aio_finish(nni_aio *aio, int result, size_t count)

  • 將aio從全局超時鏈表中刪除: nni_aio_expire_aios.
  • result記錄到aio的a_result, 可以通過nni_aio_result接口獲取.
  • count記錄到aio的a_count(IO操作傳輸的字節數), 通過nni_aio_count接口獲取.
  • a_expire = NNI_TIME_NEVER
  • a_sleep = false
  • 執行aio關聯的回調函數.
  • nni_aio_schedule

    該函數設置aio的a_expire為當前時間加上aio->a_timeout, 如果a_expire不是NNI_TIME_NEVER, 那么會使用函數nni_aio_expire_add將aio 該函數設置aio的a_expire為當前時間加上aio->a_timeout, 如果a_expire不是NNI_TIME_NEVER, 那么會使用函數nni_aio_expire_add將aio 加入到全局nni_aio_expire_aios鏈表中, nni_aio_sys_init里面會創建線程執行函數nni_aio_expire_loop. 該函數設置aio的a_expire為當前時間加上aio->a_timeout, 如果a_expire不是NNI_TIME_NEVER, 那么會使用函數nni_aio_expire_add將aio 該函數設置aio的a_expire為當前時間加上aio->a_timeout, 如果a_expire不是NNI_TIME_NEVER, 那么會使用函數nni_aio_expire_add將aio 加入到全局nni_aio_expire_aios鏈表中, nni_aio_sys_init里面會創建線程執行函數nni_aio_expire_loop.

    nni_aio_expire_loop會......

    nni_pollable

    struct nni_pollable {int p_rfd;int p_wfd;nni_mtx p_lock;bool p_raised;bool p_open; };

    nni_pollable_getfd會使用eventfd創建一個fd, p_rfd和p_wfd都等于這個fd. 設置p_open為true.

    nni_pollable_raise是會對p_wfd寫入數字1, 這樣p_rfd會變為可讀狀態. 會設置p_raised為true

    nni_plat_pipe_clear會從p_rfd里面讀取數字1, 然后清除p_raised狀態.

    msgqueue

    struct nni_msgq {nni_mtx mq_lock;int mq_cap;int mq_alloc; // alloc is cap + 2...int mq_len; // mq_msgs的當前元素個數.int mq_get;int mq_put; // mq_msgs當前可以存放msg的位置(即數組下標)int mq_geterr;bool mq_closed;nni_msg **mq_msgs; // msg指針數組nni_list mq_aio_putq; // 等待寫的aionni_list mq_aio_getq; // 等待讀的aio// Pollable status.nni_pollable *mq_sendable;nni_pollable *mq_recvable; };

    nni_msgq_init

    分配cap+2個nni_msg 指針, 賦值給mq_msgs.

    其他進行初始化.

    nni_msgq_tryput

    將msg放入msgqueue.

  • 如果mq_aio_getq鏈表不為空.
    • 取出一個aio
    • 執行nni_aio_finish_msg(aio, msg)
    • nni_msgq_run_notify(msgqueue)
    • 實際就是將msg設置到aio的a_msg里面, 然后執行aio的回調函數.
  • 如果mq_aio_getq鏈表為空
    • 將msg放入到mq_msgs數組中
  • nni_msgq_aio_put(msgqueue, waio)

  • 將waio放入鏈表mq_aio_putq尾部.
  • 處理mq_aio_putq鏈表
    • 如果mq_aio_getq鏈表里面有raio
    • 那么直接將waio里面的msg交給raio處理
  • 沒有raio, 并且mq_msg數組有空閑地方, 那么將msg放入到mq_msg里面
  • nni_msgq_aio_get(msgqueue, aio)

    類似nni_msgq_aio_get, 從mq_aio_getq里面取出一個raio, 然后從mq_msg或者mq_aio_putq的waio里面獲取一個msg. 獲取到msg后就會執行raio的回調接口.

    如果沒有msg, 那么就將raio放入到mq_aio_getq里面, 等待后面的msg

    nni_msgq_get_recvable和nni_msgq_get_sendable

    protocol

    用于表示一種協議, 實際的協議實現, 必須提供下面的nni_proto對象.

    struct nni_proto {uint32_t proto_version; // Ops vector versionnni_proto_id proto_self; // Our identitynni_proto_id proto_peer; // Peer identityuint32_t proto_flags; // Protocol flagsconst nni_proto_sock_ops *proto_sock_ops; // Per-socket opeationsconst nni_proto_pipe_ops *proto_pipe_ops; // Per-pipe operations.const nni_proto_ctx_ops * proto_ctx_ops; // Context operations.// proto_init, if not NULL, provides a function that initializes// global values. The main purpose of this may be to initialize// protocol option values.int (*proto_init)(void);// proto_fini, if not NULL, is called at shutdown, to release// any resources allocated at proto_init time.void (*proto_fini)(void); };

    proto_flags的值是如下標記

    #define NNI_PROTO_FLAG_RCV 1 // Protocol can receive #define NNI_PROTO_FLAG_SND 2 // Protocol can send #define NNI_PROTO_FLAG_SNDRCV 3 // Protocol can both send & recv #define NNI_PROTO_FLAG_RAW 4 // Protocol is raw #define NNI_PROTO_FLAG_NOMSGQ 8 // Protocol bypasses the upper queues

    nni_proto_open

    該函數由協議調用, 用于創建一個socket.

    每個協議都會有一次初始化過程, 初始化的時候會調用proto_init接口, 然后將nni_proto放入到全局鏈表nni_proto_list中.

    nni_proto_open里面調用nni_sock_open進行實際的創建工作.

    socket

    struct nni_socket {nni_list_node s_node;nni_mtx s_mx;nni_cv s_cv;nni_cv s_close_cv;uint32_t s_id;uint32_t s_flags;unsigned s_refcnt; // protected by global lockvoid * s_data; // Protocol privatenni_msgq *s_uwq; // Upper write queuenni_msgq *s_urq; // Upper read queuenni_proto_id s_self_id;nni_proto_id s_peer_id;nni_proto_pipe_ops s_pipe_ops;nni_proto_sock_ops s_sock_ops;nni_proto_ctx_ops s_ctx_ops;// optionsnni_duration s_sndtimeo; // send timeoutnni_duration s_rcvtimeo; // receive timeoutnni_duration s_reconn; // reconnect timenni_duration s_reconnmax; // max reconnect timesize_t s_rcvmaxsz; // max receive sizenni_list s_options; // opts not handled by sock/protochar s_name[64]; // socket name (legacy compat)char s_scope[24]; // socket scope ("socket%u", 32 bits max)nni_list s_listeners; // active listenersnni_list s_dialers; // active dialersnni_list s_pipes; // active pipes nni_list s_ctxs; // active contexts (protected by global sock_lk)bool s_closing; // Socket is closingbool s_closed; // Socket closed, protected by global lockbool s_ctxwait; // Waiting for contexts to close.nni_mtx s_pipe_cbs_mtx;nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];sock_stats s_stats; };

    nni_sock_create

    創建一個nni_sock對象, 然后進行初始化.

    s->s_sndtimeo = -1;s->s_rcvtimeo = -1;s->s_reconn = NNI_SECOND;s->s_reconnmax = 0;s->s_rcvmaxsz = 0; // unlimited by defaults->s_id = 0;s->s_refcnt = 0;s->s_self_id = proto->proto_self;s->s_peer_id = proto->proto_peer;s->s_flags = proto->proto_flags;s->s_sock_ops = *proto->proto_sock_ops; // 用proto的proto_sock_opss->s_pipe_ops = *proto->proto_pipe_ops; // 用proto的proto_pipe_opss->s_closed = false;s->s_closing = false;if (proto->proto_ctx_ops != NULL) {s->s_ctx_ops = *proto->proto_ctx_ops;}// 鏈表相關初始化NNI_LIST_NODE_INIT(&s->s_node);NNI_LIST_INIT(&s->s_options, nni_sockopt, node);NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);NNI_LIST_INIT(&s->s_pipes, nni_pipe, p_sock_node);NNI_LIST_INIT(&s->s_listeners, nni_listener, l_node);NNI_LIST_INIT(&s->s_dialers, nni_dialer, d_node);// 消息隊列nni_msgq_init(&s->s_uwq, 0)nni_msgq_init(&s->s_urq, 1)// 執行sock_inits->s_sock_ops.sock_init(&s->s_data, s)// 其他為設置options

    nni_sock_open

  • 執行nni_sock_create分配一個nni_sock對象, 并為其分配一個ID.
  • 將創建的對象放入sock_lists鏈表中.
  • 執行s->s_sock_ops.sock_open(s->s_data); sock_open為對應協議的proto_sock_ops的sock_open接口.
  • dialer

    struct nni_dialer {nni_tran_dialer_ops d_ops; // transport opsnni_tran * d_tran; // transport pointervoid * d_data; // transport privateuint32_t d_id; // endpoint idnni_list_node d_node; // per socket listnni_sock * d_sock;nni_url * d_url;nni_pipe * d_pipe; // active pipe (for redialer)int d_refcnt;bool d_closed; // full shutdownbool d_closing;nni_atomic_flag d_started;nni_mtx d_mtx;nni_list d_pipes;nni_aio * d_user_aio;nni_aio * d_con_aio;nni_aio * d_tmo_aio; // backoff timernni_duration d_maxrtime; // maximum time for reconnectnni_duration d_currtime; // current time for reconnectnni_duration d_inirtime; // initial time for reconnectnni_time d_conntime; // time of last good connectnni_reap_item d_reap;nni_dialer_stats d_stats; };

    nni_dialer_create

  • nni_url_parse, 解析url字符串生成nni_url對象.
  • nni_tran_find, 基于nni_url對象, 找到對應的nni_tran對象
  • 創建nni_dialer對象
  • 初始化
  • d->d_url = url;d->d_closed = false;d->d_closing = false;d->d_data = NULL;d->d_refcnt = 1;d->d_sock = s;d->d_tran = tran;nni_atomic_flag_reset(&d->d_started);// Make a copy of the endpoint operations. This allows us to// modify them (to override NULLs for example), and avoids an extra// dereference on hot paths.d->d_ops = *tran->tran_dialer;
  • 初始化相關aio:?nni_aio_init(&d->d_con_aio, dialer_connect_cb, d))
  • 初始化相關aio:?nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d))
  • 執行傳輸層的初始化: d->d_ops.d_init(&d->d_data, url, d))
  • 為dialer分配一個ID
  • 將nni_dialer放入nni_sock的s_dialers鏈表里面
  • nni_dialer_start

    分為2種情況, NONBLOCK和BLOCK模式

  • BLOCK模式
    • nni_aio_init初始化一個新的aio.
    • 執行nni_aio_begin.
    • 設置dialer的d_user_aio為aio
    • 執行dialer_connect_start.
    • 等待aio執行完成, nni_aio_wait
    • 獲取nni_aio_result
    • 執行nni_aio_fini
  • NONBLOCK模式
    • aio為NULL
    • 設置dialer的d_user_aio為NULL
    • 執行: dialer_connect_start
  • 在dialer_connect_start里面執行連接操作:?d->d_ops.d_connect(d->d_data, d->d_conn_aio);

    d_conn_aio的回調函數dialer_connect_cb, 其nni_aio_result的結果有以下情況.

  • 0: 連接成功, 新建了pipe, nni_dialer_add_pipe(d, nni_aio_get_output(aio, 0));
  • NNG_ECLOSED, NNG_ECANCELED,
  • NNG_ECONNREFUSED, NNG_ETIMEDOUT,default: nni_sleep_aio(backoff ? nni_random() % backoff : 0, d->d_tmo_aio);
  • listener

    struct nni_listener {nni_tran_listener_ops l_ops; // transport opsnni_tran * l_tran; // transport pointervoid * l_data; // transport privateuint32_t l_id; // endpoint idnni_list_node l_node; // per socket listnni_sock * l_sock;nni_url * l_url;int l_refcnt;bool l_closed; // full shutdownbool l_closing; // close started (shutdown)nni_atomic_flag l_started;nni_list l_pipes; nni_aio * l_acc_aio;nni_aio * l_tmo_aio;nni_reap_item l_reap;nni_listener_stats l_stats; };

    nni_listener_create

    其過程類似nni_dialer_create.

  • l->l_ops = *tran->tran_listener;
  • nni_aio_init(&l->l_acc_aio, listener_accept_cb, l)
  • nni_aio_init(&l->l_tmo_aio, listener_timer_cb, l)
  • l->l_ops.l_init(&l->l_data, url, l)
  • nni_sock_add_listener(s, l)
  • nni_listener_start

  • l->l_ops.l_bind(l->l_data)
  • 執行l->l_ops.l_accept(l->l_data, l->l_acc_aio);
  • l_acc_aio的回調為listener_accept_cb, 其獲取aio的結果.

  • 0, accept成功, 得到一個pipe, 添加到listener中去. 然后再次開始accept.
  • NNG_ECONNABORTED, NNG_ECONNRESET, NNG_ETIMEDOUT, NNG_EPEERAUTH, 繼續執行accept.
  • NNG_ECLOSED, NNG_ECANCELED, 無動作
  • 其他情況: nni_sleep_aio(100, l->l_tmo_aio); 休眠.
  • pipe

    struct nni_pipe {uint32_t p_id;nni_tran_pipe_ops p_tran_ops;nni_proto_pipe_ops p_proto_ops;void * p_tran_data;void * p_proto_data;nni_list_node p_sock_node;nni_list_node p_ep_node;nni_sock * p_sock;nni_dialer * p_dialer;nni_listener * p_listener;bool p_closed;nni_atomic_flag p_stop;bool p_cbs;int p_refcnt;nni_mtx p_mtx;nni_cv p_cv;nni_reap_item p_reap;nni_pipe_stats p_stats; };

    pipe_create

    pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)

  • 創建一個nni_pipe對象
  • 初始化
  • p->p_tran_ops = *tran->tran_pipe; // 傳輸層操作p->p_tran_data = tdata; // 傳輸層數據p->p_proto_ops = *sock->s_pipe_ops; // 協議層操作p->p_proto_data = NULL;p->p_sock = sock;p->p_closed = false;p->p_cbs = false;p->p_refcnt = 0;st = &p->p_stats;
  • 執行:?p->p_tran_ops.p_init(tdata, p)
  • 執行: `p->p_proto_ops.pipe_init(&p->p_proto_data, p, sock->s_data)
  • 傳輸層inproc

    傳輸層的實現, 必須提供struct nni_tran對象, inproc的該對象內主要是包含tran_dialer, tran_listener, tran_pipe三組回調函數, 以及tran_init用于初始化的函數.

    inproc的實現比較簡單, 內部主要是用nni_msgq進行數據傳輸.

    inproc->tran_dialer

    tran_dialer里面主要是提供init以及connect接口.

    inproc_dialer_init

  • 分配一個inproc_ep對象
  • 初始化inproc_ep對象, 設置ep->addr.
  • inproc_ep_connect

  • nni_begin_aio(aio)
  • 在全局nni_inproc.servers鏈表里面找到支持inproc_ep->addr的server. 這個servers鏈表里面也是inproc_ep對象, 是inproc_listener添加進去的.
  • 將當前inproc_ep, 放入到server->clients鏈表中.
  • 將當前aio, 加入到inproc_ep->aios鏈表中.
  • 執行inproc_accept_clients(server)創建相關的pipe.
  • inproc_accept_clients

  • 創建inproc_pair對象.
  • 對客戶端和服務端分別創建一個inproc_pipe對象, spipe和cpipe
  • 初始化inproc_pair里面的消息隊列nni_msgq.
  • 設置spipe和cpipe的peer成員為對端的proto.
  • 初始化inproc_pair里面的pipes為cpipe和spipe.
  • cpipe->pair = spipe->pair = pair;
  • cpipe->rq = spipe->wq = pair->q[0];
  • cpipe->wq = spipe->rq = pair->q[1];
  • inproc_pipe_recv

  • 執行nni_msgq_aio_get(pipe->rq, aio);
  • inproc_pipe_send

  • 設置aio里面的msg
  • nni_msgq_aio_put(pipe->wq, aio); 將aio放入nni_msgq.
  • nng源碼閱讀 · GitHub

    總結

    以上是生活随笔為你收集整理的nng源码阅读的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。