nng源码阅读
Core
thread & mutex & cv
nni_mtx
互斥鎖, 提供nni_mtx_init,?nni_mtx_fini,?nni_mtx_lock,?nni_mtx_unlock接口.
nni_cv
條件變量, 提供nni_cv_init,?nni_cv_fini,?nni_cv_wake,?nni_cv_wake1,?nni_cv_until接口.
POSIX下就是pthread的條件變量.
nni_thr
int nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg);nni_thr_run,?nni_thr_fini,?nni_thr_wait,?nni_thr_is_self
線程有init, start, stop, done4個狀態, 執行nni_thr_init后, 進入init狀態. 執行nni_thr_run進入start狀態. 執行nni_thr_wait后, 首先設置為stop狀態, 然后等待線程變成done狀態.
nni_thr并不直接執行nni_thr_func函數, 而是執行nni_thr_wrap函數, 該函數內部再調用nni_thr_init提供的回調函數.
nni_thr_wrap
taskq
taskq為任務隊列, 內部包含一個鏈表, 鏈表中為需要執行的task, 每個taskq里面會有多個線程來處理鏈表里面的task.
struct nni_taskq {nni_list tq_tasks; // 任務列表, 雙向鏈表實現nni_mtx tq_mtx; // 鎖nni_cv tq_sched_cv; // 條件變量nni_cv tq_wait_cv; // 條件變量nni_taskq_thr *tq_threads; // 線程數組int tq_nthreads; // 線程數bool tq_run; // true表示線程開始執行. };struct nni_task {nni_list_node task_node; // tq_tasks的節點void * task_arg; // 任務的參數nni_cb task_cb; // 該任務的回調函數nni_taskq * task_tq; // 任務所屬taskqnni_thr * task_thr; // 執行任務的線程. non-NULL if the task is runningunsigned task_busy;bool task_prep;bool task_reap; // task執行完后是否需要銷毀.nni_mtx task_mtx; // task的鎖.nni_cv task_cv; };nni_taskq_init(nni_taskq **tqp, int nthr)
創建一個新的taskq, nthr指定taskq關聯的線程數, 進行nni_taskq的初始化. 啟動全部關聯的線程, 線程的函數為nni_taskq_thread.
nni_taskq_thread
nni_taskq_sys_init
系統的taskq, 相當于全局的taskq, 其中線程數最少為2, 默認為CPU數乘2, 如果編譯設置NNG_NUM_TASKQ_THREADS宏, 那么線程數就是該宏的值.
nni_task_init
創建一個nni_task. 默認情況: task_prep為false, task_reap為false, task_busy為0.
nni_task_prep
task_busy ++, task_prep = true
nni_task_wait
如果task_busy不為0, 那么等待task_cv條件變量
nni_task_dispatch
將task加入到taskq的任務鏈表, 然后執行nni_cv_wake1喚醒tq_sched_cv條件變量, 相當于是異步執行任務.
nni_task_exec
執行task_cb回調函數. 相當于是同步執行任務.
aio
struct nng_aio {int a_result; // Result code (nng_errno)size_t a_count; // Bytes transferred (I/O only)nni_time a_expire; // Absolute timeoutnni_duration a_timeout; // Relative timeout// These fields are private to the aio framework.bool a_stop; // shutting down (no new operations)bool a_sleep; // sleeping with no actionint a_sleeprv; // result when sleep wakesnni_task *a_task;// Read/write operations.nni_iov *a_iov;unsigned a_niov;nni_iov a_iovinl[4]; // inline IOVs - when the IOV list is shortnni_iov *a_iovalloc; // dynamically allocated IOVsunsigned a_niovalloc; // number of allocated IOVs// Message operations.nni_msg *a_msg;// User scratch data. Consumers may store values here, which// must be preserved by providers and the framework.void *a_user_data[4];// Operation inputs & outputs. Up to 4 inputs and 4 outputs may be// specified. The semantics of these will vary, and depend on the// specific operation.void *a_inputs[4];void *a_outputs[4];// Provider-use fields.nni_aio_cancelfn a_cancel_fn;void * a_cancel_arg;void * a_prov_data;nni_list_node a_prov_node;void * a_prov_extra[4]; // Extra data used by provider// Socket address. This turns out to be very useful, as we wind up// needing socket addresses for numerous connection related routines.// It would be cleaner to not have this and avoid burning the space,// but having this hear dramatically simplifies lots of code.nng_sockaddr a_sockaddr;// Expire node.nni_list_node a_expire_node; };nni_aio_init
主要是創建a_task. 這個task關聯一個回調函數.
a_expire = NNI_TIME_NEVER a_timeout = NNG_DURATION_INFINITE a_iov = a_iovinl a_niovalloc = 0
void a_user_data[4];
可以通過nni_aio_set_data和nni_aio_get_data設置和獲取數據,
a_inputs, a_outputs
通過nni_aio_set_input,?nni_aio_get_input,?nni_aio_set_output,?nni_aio_get_output設置和獲取
nni_aio_begin
進行一些初始化
- a_result = 0;
- a_count = 0;
- a_cancel_fn = 0;
- a_outputs = NULL
- nni_task_prep(aio->a_task)
nni_aio_finish
int nni_aio_finish(nni_aio *aio, int result, size_t count)
nni_aio_schedule
該函數設置aio的a_expire為當前時間加上aio->a_timeout, 如果a_expire不是NNI_TIME_NEVER, 那么會使用函數nni_aio_expire_add將aio 該函數設置aio的a_expire為當前時間加上aio->a_timeout, 如果a_expire不是NNI_TIME_NEVER, 那么會使用函數nni_aio_expire_add將aio 加入到全局nni_aio_expire_aios鏈表中, nni_aio_sys_init里面會創建線程執行函數nni_aio_expire_loop. 該函數設置aio的a_expire為當前時間加上aio->a_timeout, 如果a_expire不是NNI_TIME_NEVER, 那么會使用函數nni_aio_expire_add將aio 該函數設置aio的a_expire為當前時間加上aio->a_timeout, 如果a_expire不是NNI_TIME_NEVER, 那么會使用函數nni_aio_expire_add將aio 加入到全局nni_aio_expire_aios鏈表中, nni_aio_sys_init里面會創建線程執行函數nni_aio_expire_loop.
nni_aio_expire_loop會......
nni_pollable
struct nni_pollable {int p_rfd;int p_wfd;nni_mtx p_lock;bool p_raised;bool p_open; };nni_pollable_getfd會使用eventfd創建一個fd, p_rfd和p_wfd都等于這個fd. 設置p_open為true.
nni_pollable_raise是會對p_wfd寫入數字1, 這樣p_rfd會變為可讀狀態. 會設置p_raised為true
nni_plat_pipe_clear會從p_rfd里面讀取數字1, 然后清除p_raised狀態.
msgqueue
struct nni_msgq {nni_mtx mq_lock;int mq_cap;int mq_alloc; // alloc is cap + 2...int mq_len; // mq_msgs的當前元素個數.int mq_get;int mq_put; // mq_msgs當前可以存放msg的位置(即數組下標)int mq_geterr;bool mq_closed;nni_msg **mq_msgs; // msg指針數組nni_list mq_aio_putq; // 等待寫的aionni_list mq_aio_getq; // 等待讀的aio// Pollable status.nni_pollable *mq_sendable;nni_pollable *mq_recvable; };nni_msgq_init
分配cap+2個nni_msg 指針, 賦值給mq_msgs.
其他進行初始化.
nni_msgq_tryput
將msg放入msgqueue.
- 取出一個aio
- 執行nni_aio_finish_msg(aio, msg)
- nni_msgq_run_notify(msgqueue)
- 實際就是將msg設置到aio的a_msg里面, 然后執行aio的回調函數.
- 將msg放入到mq_msgs數組中
nni_msgq_aio_put(msgqueue, waio)
- 如果mq_aio_getq鏈表里面有raio
- 那么直接將waio里面的msg交給raio處理
nni_msgq_aio_get(msgqueue, aio)
類似nni_msgq_aio_get, 從mq_aio_getq里面取出一個raio, 然后從mq_msg或者mq_aio_putq的waio里面獲取一個msg. 獲取到msg后就會執行raio的回調接口.
如果沒有msg, 那么就將raio放入到mq_aio_getq里面, 等待后面的msg
nni_msgq_get_recvable和nni_msgq_get_sendable
protocol
用于表示一種協議, 實際的協議實現, 必須提供下面的nni_proto對象.
struct nni_proto {uint32_t proto_version; // Ops vector versionnni_proto_id proto_self; // Our identitynni_proto_id proto_peer; // Peer identityuint32_t proto_flags; // Protocol flagsconst nni_proto_sock_ops *proto_sock_ops; // Per-socket opeationsconst nni_proto_pipe_ops *proto_pipe_ops; // Per-pipe operations.const nni_proto_ctx_ops * proto_ctx_ops; // Context operations.// proto_init, if not NULL, provides a function that initializes// global values. The main purpose of this may be to initialize// protocol option values.int (*proto_init)(void);// proto_fini, if not NULL, is called at shutdown, to release// any resources allocated at proto_init time.void (*proto_fini)(void); };proto_flags的值是如下標記
#define NNI_PROTO_FLAG_RCV 1 // Protocol can receive #define NNI_PROTO_FLAG_SND 2 // Protocol can send #define NNI_PROTO_FLAG_SNDRCV 3 // Protocol can both send & recv #define NNI_PROTO_FLAG_RAW 4 // Protocol is raw #define NNI_PROTO_FLAG_NOMSGQ 8 // Protocol bypasses the upper queuesnni_proto_open
該函數由協議調用, 用于創建一個socket.
每個協議都會有一次初始化過程, 初始化的時候會調用proto_init接口, 然后將nni_proto放入到全局鏈表nni_proto_list中.
nni_proto_open里面調用nni_sock_open進行實際的創建工作.
socket
struct nni_socket {nni_list_node s_node;nni_mtx s_mx;nni_cv s_cv;nni_cv s_close_cv;uint32_t s_id;uint32_t s_flags;unsigned s_refcnt; // protected by global lockvoid * s_data; // Protocol privatenni_msgq *s_uwq; // Upper write queuenni_msgq *s_urq; // Upper read queuenni_proto_id s_self_id;nni_proto_id s_peer_id;nni_proto_pipe_ops s_pipe_ops;nni_proto_sock_ops s_sock_ops;nni_proto_ctx_ops s_ctx_ops;// optionsnni_duration s_sndtimeo; // send timeoutnni_duration s_rcvtimeo; // receive timeoutnni_duration s_reconn; // reconnect timenni_duration s_reconnmax; // max reconnect timesize_t s_rcvmaxsz; // max receive sizenni_list s_options; // opts not handled by sock/protochar s_name[64]; // socket name (legacy compat)char s_scope[24]; // socket scope ("socket%u", 32 bits max)nni_list s_listeners; // active listenersnni_list s_dialers; // active dialersnni_list s_pipes; // active pipes nni_list s_ctxs; // active contexts (protected by global sock_lk)bool s_closing; // Socket is closingbool s_closed; // Socket closed, protected by global lockbool s_ctxwait; // Waiting for contexts to close.nni_mtx s_pipe_cbs_mtx;nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];sock_stats s_stats; };nni_sock_create
創建一個nni_sock對象, 然后進行初始化.
s->s_sndtimeo = -1;s->s_rcvtimeo = -1;s->s_reconn = NNI_SECOND;s->s_reconnmax = 0;s->s_rcvmaxsz = 0; // unlimited by defaults->s_id = 0;s->s_refcnt = 0;s->s_self_id = proto->proto_self;s->s_peer_id = proto->proto_peer;s->s_flags = proto->proto_flags;s->s_sock_ops = *proto->proto_sock_ops; // 用proto的proto_sock_opss->s_pipe_ops = *proto->proto_pipe_ops; // 用proto的proto_pipe_opss->s_closed = false;s->s_closing = false;if (proto->proto_ctx_ops != NULL) {s->s_ctx_ops = *proto->proto_ctx_ops;}// 鏈表相關初始化NNI_LIST_NODE_INIT(&s->s_node);NNI_LIST_INIT(&s->s_options, nni_sockopt, node);NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);NNI_LIST_INIT(&s->s_pipes, nni_pipe, p_sock_node);NNI_LIST_INIT(&s->s_listeners, nni_listener, l_node);NNI_LIST_INIT(&s->s_dialers, nni_dialer, d_node);// 消息隊列nni_msgq_init(&s->s_uwq, 0)nni_msgq_init(&s->s_urq, 1)// 執行sock_inits->s_sock_ops.sock_init(&s->s_data, s)// 其他為設置optionsnni_sock_open
dialer
struct nni_dialer {nni_tran_dialer_ops d_ops; // transport opsnni_tran * d_tran; // transport pointervoid * d_data; // transport privateuint32_t d_id; // endpoint idnni_list_node d_node; // per socket listnni_sock * d_sock;nni_url * d_url;nni_pipe * d_pipe; // active pipe (for redialer)int d_refcnt;bool d_closed; // full shutdownbool d_closing;nni_atomic_flag d_started;nni_mtx d_mtx;nni_list d_pipes;nni_aio * d_user_aio;nni_aio * d_con_aio;nni_aio * d_tmo_aio; // backoff timernni_duration d_maxrtime; // maximum time for reconnectnni_duration d_currtime; // current time for reconnectnni_duration d_inirtime; // initial time for reconnectnni_time d_conntime; // time of last good connectnni_reap_item d_reap;nni_dialer_stats d_stats; };nni_dialer_create
nni_dialer_start
分為2種情況, NONBLOCK和BLOCK模式
- nni_aio_init初始化一個新的aio.
- 執行nni_aio_begin.
- 設置dialer的d_user_aio為aio
- 執行dialer_connect_start.
- 等待aio執行完成, nni_aio_wait
- 獲取nni_aio_result
- 執行nni_aio_fini
- aio為NULL
- 設置dialer的d_user_aio為NULL
- 執行: dialer_connect_start
在dialer_connect_start里面執行連接操作:?d->d_ops.d_connect(d->d_data, d->d_conn_aio);
d_conn_aio的回調函數dialer_connect_cb, 其nni_aio_result的結果有以下情況.
listener
struct nni_listener {nni_tran_listener_ops l_ops; // transport opsnni_tran * l_tran; // transport pointervoid * l_data; // transport privateuint32_t l_id; // endpoint idnni_list_node l_node; // per socket listnni_sock * l_sock;nni_url * l_url;int l_refcnt;bool l_closed; // full shutdownbool l_closing; // close started (shutdown)nni_atomic_flag l_started;nni_list l_pipes; nni_aio * l_acc_aio;nni_aio * l_tmo_aio;nni_reap_item l_reap;nni_listener_stats l_stats; };nni_listener_create
其過程類似nni_dialer_create.
nni_listener_start
l_acc_aio的回調為listener_accept_cb, 其獲取aio的結果.
pipe
struct nni_pipe {uint32_t p_id;nni_tran_pipe_ops p_tran_ops;nni_proto_pipe_ops p_proto_ops;void * p_tran_data;void * p_proto_data;nni_list_node p_sock_node;nni_list_node p_ep_node;nni_sock * p_sock;nni_dialer * p_dialer;nni_listener * p_listener;bool p_closed;nni_atomic_flag p_stop;bool p_cbs;int p_refcnt;nni_mtx p_mtx;nni_cv p_cv;nni_reap_item p_reap;nni_pipe_stats p_stats; };pipe_create
pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran, void *tdata)
傳輸層inproc
傳輸層的實現, 必須提供struct nni_tran對象, inproc的該對象內主要是包含tran_dialer, tran_listener, tran_pipe三組回調函數, 以及tran_init用于初始化的函數.
inproc的實現比較簡單, 內部主要是用nni_msgq進行數據傳輸.
inproc->tran_dialer
tran_dialer里面主要是提供init以及connect接口.
inproc_dialer_init
inproc_ep_connect
inproc_accept_clients
inproc_pipe_recv
inproc_pipe_send
nng源碼閱讀 · GitHub
總結
- 上一篇: 洛谷—— P2934 [USACO09J
- 下一篇: SCI投稿过程总结、投稿状态解析、修稿处