日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > Nginx >内容正文

Nginx

Nginx-rtmp直播之业务流程分析--比较详细

發(fā)布時(shí)間:2024/2/28 Nginx 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Nginx-rtmp直播之业务流程分析--比较详细 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1. 綜述

1.1 直播原理

使用 obs 向 nginx 推送一個(gè)直播流,該直播流經(jīng) nginx-rtmp 的 ngx_rtmp_live_module 模塊轉(zhuǎn)發(fā)給 application live 應(yīng)用,
然后使用 vlc 連接 live,播放該直播流。

1.2 nginx.conf

# 創(chuàng)建的子進(jìn)程數(shù) worker_processes 1;error_log stderr debug;daemon off;master_process off;events {worker_connections 1024; }rtmp {server {listen 1935; # rtmp傳輸端口chunk_size 4096; # 數(shù)據(jù)傳輸塊大小application live { # 直播配置live on;}# obs 將流推到該 push 應(yīng)用,push 應(yīng)用又將該流發(fā)布到 live 應(yīng)用application push {live on;push rtmp://192.168.1.82:1935/live; # 推流到上面的直播應(yīng)用}} }

1.3 obs 推流設(shè)置

  • 點(diǎn)擊 "+" 選擇一個(gè)媒體源,確定,然后設(shè)置該媒體源,如下圖:

  • 點(diǎn)擊 "設(shè)置" 選擇 "流",設(shè)置推流地址,如下圖,確定后即可進(jìn)行推流:

  • 1.4 使用 vlc 播放直播流

    2. 源碼分析:application push

    首先開(kāi)始分析從 obs 推送 rtmp 流到 nginx 服務(wù)器的整個(gè)流程。

    2.1 監(jiān)聽(tīng)連接

    nginx 啟動(dòng)后,就會(huì)一直在 ngx_process_events 函數(shù)中的 epoll_eait 處休眠,監(jiān)聽(tīng)客戶端的連接:

    static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) {...ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,"epoll timer: %M", timer);/* nginx 最初運(yùn)行時(shí),timer 為 -1,即一直等待客戶端連接 */events = epoll_wait(ep, event_list, (int) nevents, timer);...for (i = 0; i < events; i++) {c = event_list[i].data.ptr;instance = (uintptr_t) c & 1;c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);/* 獲取被監(jiān)聽(tīng)的讀事件 */rev = c->read;/* 獲取 epoll_wait 返回的事件標(biāo)志 */revents = event_list[i].events;.../* 若是監(jiān)聽(tīng)的事件可讀,首次監(jiān)聽(tīng)即表示有新連接到來(lái) */if ((revents & EPOLLIN) && rev->active) {...rev->ready = 1;/* 若是開(kāi)啟了負(fù)載均衡,則先將該事件添加到 ngx_posted_accept_events * 延遲隊(duì)列中 */if (flags & NGX_POST_EVENTS) {queue = rev->accept ? &ngx_posted_accept_events: &ngx_posted_events;ngx_post_event(rev, queue);} else {/* 否則,直接調(diào)用該讀事件的回調(diào)函數(shù),若是新連接則* 調(diào)用的是 ngx_event_accept 函數(shù) */rev->handler(rev);}}...}return NGX_OK; }

    ngx_event_accept 函數(shù)中主要也就是接受客戶端的連接,并調(diào)用該監(jiān)聽(tīng)端口對(duì)應(yīng)的回調(diào)函數(shù):

    void ngx_event_accept(ngx_event_t *ev) {...do {...s = accept(lc->fd, &sa.sockaddr, &socklen);.../* 調(diào)用該監(jiān)聽(tīng)端口對(duì)應(yīng)的回調(diào)函數(shù),對(duì)于 rtmp 模塊,則固定為 ngx_rtmp_init_connection */ls->handler(c);...} while (ev->available); }

    在 ngx_rtmp_init_connection 函數(shù)中先經(jīng)過(guò)一系列的初始化后,開(kāi)始接收與客戶端進(jìn)行 rtmp 的 handshake 過(guò)程。

    下面從 hanshake 到 hanshake 成功后接收到第一個(gè) rtmp 包之間僅以圖片說(shuō)明,就不再分析源碼了。

    2.2 handshake

    2.2.1 hs_stage: SERVER_RECV_CHALLENGE(1)

    該 hanshake 階段即為等待接收客戶端發(fā)送的 C0 和 C1 階段。

    receive: Handshake C0+C1 圖(1)


    接收到客戶端發(fā)送的 C0 和 C1 后,服務(wù)器進(jìn)入 NGX_RTMP_HANDSHAKE_SERVER_SEND_CHALLENGE(2)階段,即為
    發(fā)送S0 和 S1 階段。

    2.2.2 hs_stage: SERVER_SEND_CHALLENGE(2) 和 SERVER_SEND_RESPONSE(3)

    該 SERVER_SEND_CHALLENGE 階段即為等待接收客戶端發(fā)送的 S0 和 S1 階段。但是實(shí)際上,服務(wù)器在發(fā)送完 S0 和
    S1 后,進(jìn)入到 SERVER_SEND_RESPONSE(3) 階段后又立刻發(fā)送 S2,因此,在抓到的包如下:

    send: Handshake S0+S1+S2 圖(2)

    2.2.3 hs_stage: SERVER_RECV_RESPONSE(4)

    該階段為等待接收客戶端發(fā)送的 C2 階段。

    receive:Handshake C2 圖(3)


    至此,服務(wù)器和客戶端的 rtmp handshake 過(guò)程完整,開(kāi)始正常的信息交互階段。

    如下代碼,接收到 C2 后,服務(wù)器即進(jìn)入循環(huán)處理客戶端的請(qǐng)求階段:ngx_rtmp_cycle

    static void ngx_rtmp_handshake_done(ngx_rtmp_session_t *s) {ngx_rtmp_free_handshake_buffers(s);ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"handshake: done");if (ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE,NULL, NULL) != NGX_OK){ngx_rtmp_finalize_session(s);return;}ngx_rtmp_cycle(s); }

    ngx_rtmp_cycle 函數(shù)中,重新設(shè)置了當(dāng)前 rtmp 連接的讀、寫事件的回調(diào)函數(shù),當(dāng)監(jiān)聽(tīng)到客戶端發(fā)送的數(shù)據(jù)時(shí),將調(diào)用
    ngx_rtmp_recv 函數(shù)進(jìn)行處理。

    void ngx_rtmp_cycle(ngx_rtmp_session_t *s) {ngx_connection_t *c;c = s->connection;c->read->handler = ngx_rtmp_recv;c->write->handler = ngx_rtmp_send;s->ping_evt.data = c;s->ping_evt.log = c->log;s->ping_evt.handler = ngx_rtmp_ping;ngx_rtmp_reset_ping(s);ngx_rtmp_recv(c->read); }

    在 ngx_rtmp_recv 函數(shù)中,會(huì)循環(huán)接收客戶端發(fā)來(lái)的 rtmp 包數(shù)據(jù),接收到完整的一個(gè) rtmp message 后,會(huì)根據(jù)該消息
    的 rtmp message type,調(diào)用相應(yīng)的函數(shù)進(jìn)行處理,如,若為 20,即為 amf0 類型的命令消息,就會(huì)調(diào)用
    ngx_rtmp_amf_message_handler 函數(shù)進(jìn)行處理。

    2.3 connect(‘push‘)

    hanshake 成功后,接收到客戶端發(fā)來(lái)的第一個(gè) rtmp 包為連接 nginx.conf 中 rtmp{} 下的 application push{}
    應(yīng)用,如下圖:

    receive: connect(‘push‘) 圖(4)


    從該圖可知,該消息類型為 20,即為 AMF0 Command,因此會(huì)調(diào)用 ngx_rtmp_amf_message_handler 對(duì)該消息進(jìn)行解析,
    然后對(duì)其中的命令 connect 調(diào)用預(yù)先設(shè)置好的 ngx_rtmp_cmd_connect_init 回調(diào)函數(shù)。在 ngx_rtmp_cmd_connect_init
    函數(shù)中,繼續(xù)解析該 connect 余下的消息后,開(kāi)始 ngx_rtmp_connect 構(gòu)件的 connect 函數(shù)鏈表,該鏈表中存放著各個(gè)
    rtmp 模塊對(duì)該 connect 命令所要做的操作(注:僅有部分 rtmp 模塊會(huì)對(duì)該 connect 命令設(shè)置有回調(diào)函數(shù),并且就算
    設(shè)置了回調(diào)函數(shù),也需要在配置文件中啟用相應(yīng)的模塊才會(huì)真正執(zhí)行該模塊對(duì) connect 的處理)。因此,對(duì)于 connect
    命令,這里僅會(huì)真正處理 ngx_rtmp_cmd_module 模塊設(shè)置 ngx_rtmp_cmd_connect 回調(diào)函數(shù)。

    2.3.1 ngx_rtmp_cmd_connect

    static ngx_int_t ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"rtmp cmd: connect");ngx_rtmp_core_srv_conf_t *cscf;ngx_rtmp_core_app_conf_t **cacfp;ngx_uint_t n;ngx_rtmp_header_t h;u_char *p;static double trans;static double capabilities = NGX_RTMP_CAPABILITIES;static double object_encoding = 0;/* 以下內(nèi)容為服務(wù)器將要對(duì)客戶端的 connect 命令返回的 amf 類型的響應(yīng) */static ngx_rtmp_amf_elt_t out_obj[] = {{ NGX_RTMP_AMF_STRING,ngx_string("fmsVer"),NGX_RTMP_FMS_VERSION, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("capabilities"),&capabilities, 0 },};static ngx_rtmp_amf_elt_t out_inf[] = {{ NGX_RTMP_AMF_STRING,ngx_string("level"),"status", 0 },{ NGX_RTMP_AMF_STRING,ngx_string("code"),"NetConnection.Connect.Success", 0 },{ NGX_RTMP_AMF_STRING,ngx_string("description"),"Connection succeeded.", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("objectEncoding"),&object_encoding, 0 }};static ngx_rtmp_amf_elt_t out_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,"_result", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&trans, 0 },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,out_obj, sizeof(out_obj) },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,out_inf, sizeof(out_inf) },};if (s->connected) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"connect: duplicate connection");return NGX_ERROR;}cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);trans = v->trans;/* fill session parameters */s->connected = 1;ngx_memzero(&h, sizeof(h));h.csid = NGX_RTMP_CSID_AMF_INI;h.type = NGX_RTMP_MSG_AMF_CMD;#define NGX_RTMP_SET_STRPAR(name) s->name.len = ngx_strlen(v->name); s->name.data = ngx_palloc(s->connection->pool, s->name.len); ngx_memcpy(s->name.data, v->name, s->name.len)NGX_RTMP_SET_STRPAR(app);NGX_RTMP_SET_STRPAR(args);NGX_RTMP_SET_STRPAR(flashver);NGX_RTMP_SET_STRPAR(swf_url);NGX_RTMP_SET_STRPAR(tc_url);NGX_RTMP_SET_STRPAR(page_url);#undef NGX_RTMP_SET_STRPARp = ngx_strlchr(s->app.data, s->app.data + s->app.len, ‘?‘);if (p) {s->app.len = (p - s->app.data);}s->acodecs = (uint32_t) v->acodecs;s->vcodecs = (uint32_t) v->vcodecs;/* 找到客戶端 connect 的應(yīng)用配置 *//* find application & set app_conf */cacfp = cscf->applications.elts;for(n = 0; n < cscf->applications.nelts; ++n, ++cacfp) {if ((*cacfp)->name.len == s->app.len &&ngx_strncmp((*cacfp)->name.data, s->app.data, s->app.len) == 0){/* found app! */s->app_conf = (*cacfp)->app_conf;break;}}if (s->app_conf == NULL) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"connect: application not found: ‘%V‘", &s->app);return NGX_ERROR;}object_encoding = v->object_encoding;/* 發(fā)送應(yīng)答窗口大小:ack_size 給客戶端,該消息是用來(lái)通知對(duì)方應(yīng)答窗口的大小,* 發(fā)送方在發(fā)送了等于窗口大小的數(shù)據(jù)之后,等的愛(ài)接收對(duì)方的應(yīng)答消息(在接收 * 到應(yīng)答消息之前停止發(fā)送數(shù)據(jù))。接收當(dāng)必須發(fā)送應(yīng)答消息,在會(huì)話開(kāi)始時(shí),在 * 會(huì)話開(kāi)始時(shí),會(huì)從上一次發(fā)送應(yīng)答之后接收到了等于窗口大小的數(shù)據(jù) */return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK ||/* 發(fā)送 設(shè)置流帶寬消息。發(fā)送此消息來(lái)說(shuō)明對(duì)方的出口帶寬限制,接收方以此來(lái)限制 * 自己的出口帶寬,即限制未被應(yīng)答的消息數(shù)據(jù)大小。接收到此消息的一方,如果 * 窗口大小與上一次發(fā)送的不一致,應(yīng)該回復(fù)應(yīng)答窗口大小的消息 */ngx_rtmp_send_bandwidth(s, cscf->ack_window,NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK ||/* 發(fā)送 設(shè)置塊消息消息,用來(lái)通知對(duì)方新的最大的塊大小。 */ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK ||ngx_rtmp_send_amf(s, &h, out_elts,sizeof(out_elts) / sizeof(out_elts[0]))!= NGX_OK ? NGX_ERROR : NGX_OK; }

    send: ack_size 圖(5)

    send: peer bandwidth 圖(6)

    send:chunk_size 圖(7)

    send:_result(‘NetConnection.Connect.Success‘) 圖(8)

    2.4 releaseStream(‘test‘)

    服務(wù)器響應(yīng)客戶端 connect 命令消息后,客戶端接著發(fā)送 releaseStream 命令消息給服務(wù)器,但是 nginx-rtmp 中沒(méi)有
    任何一個(gè) rtmp 模塊對(duì)該命令設(shè)置有回調(diào)函數(shù),因此,不進(jìn)行處理,接著等待接收下一個(gè)消息。

    receive: releaseStream(‘test‘) 圖(9)

    2.5 createStream(‘‘)

    接著服務(wù)器接收到客戶端發(fā)來(lái)的 createStream 命令消息。

    receive: createStream(‘‘) 圖(10)


    從以前的分析可知,此時(shí),會(huì)調(diào)用 ngx_rtmp_cmd_create_stream_init 函數(shù)。

    2.5.1 ngx_rtmp_cmd_create_stream_init

    static ngx_int_t ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {static ngx_rtmp_create_stream_t v;static ngx_rtmp_amf_elt_t in_elts[] = {{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&v.trans, sizeof(v.trans) },};/* 解析該 createStream 命令消息,獲取 v.trans 值,從圖(10) 可知,為 4 */if (ngx_rtmp_receive_amf(s, in, in_elts,sizeof(in_elts) / sizeof(in_elts[0]))){return NGX_ERROR;}ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "createStream");return ngx_rtmp_create_stream(s, &v); }

    接著,從該函數(shù)中開(kāi)始調(diào)用 ngx_rtmp_create_stream 構(gòu)建的函數(shù)鏈表。這里調(diào)用到的是 ngx_rtmp_cmd_create_stream
    函數(shù)。

    2.5.2 ngx_rtmp_cmd_create_stream

    static ngx_int_t ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp cmd: create stream");/* support one message stream per connection */static double stream;static double trans;ngx_rtmp_header_t h;static ngx_rtmp_amf_elt_t out_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,"_result", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&trans, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&stream, sizeof(stream) },};trans = v->trans;stream = NGX_RTMP_MSID;ngx_memzero(&h, sizeof(h));h.csid = NGX_RTMP_CSID_AMF_INI;h.type = NGX_RTMP_MSG_AMF_CMD;return ngx_rtmp_send_amf(s, &h, out_elts,sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK ?NGX_DONE : NGX_ERROR; }

    該函數(shù)主要是發(fā)送服務(wù)器對(duì) createStream 的響應(yīng)。

    send: _result()

    2.6 publish(‘test‘)

    接著,客戶端發(fā)送 publish 給服務(wù)器,用來(lái)發(fā)布一個(gè)有名字的流到服務(wù)器,其他客戶端可以使用此流名來(lái)播放流,接收
    發(fā)布的音頻,視頻,以及其他數(shù)據(jù)消息。

    receive:publish(‘test‘) 圖(11)


    從圖中可知,publish type 為 ‘live‘,即服務(wù)器不會(huì)保存客戶端發(fā)布的流到文件中。

    2.6.1 ngx_rtmp_cmd_publish_init

    static ngx_int_t ngx_rtmp_cmd_publish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {static ngx_rtmp_publish_t v;static ngx_rtmp_amf_elt_t in_elts[] = {/* transaction is always 0 */{ NGX_RTMP_AMF_NUMBER,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_STRING,ngx_null_string,&v.name, sizeof(v.name) },{ NGX_RTMP_AMF_OPTIONAL | NGX_RTMP_AMF_STRING,ngx_null_string,&v.type, sizeof(v.type) },};ngx_memzero(&v, sizeof(v));/* 從 publish 命令消息中獲取 in_elts 中指定的值 */if (ngx_rtmp_receive_amf(s, in, in_elts,sizeof(in_elts) / sizeof(in_elts[0]))){return NGX_ERROR;}ngx_rtmp_cmd_fill_args(v.name, v.args);ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"publish: name=‘%s‘ args=‘%s‘ type=%s silent=%d",v.name, v.args, v.type, v.silent);return ngx_rtmp_publish(s, &v); }

    接著,該函數(shù)開(kāi)始調(diào)用 ngx_rtmp_publish 構(gòu)建的函數(shù)鏈表。從 nginx-rtmp 的源碼和 nginx.conf 的配置可知,主要調(diào)用
    ngx_rtmp_relay_publish 和 ngx_rtmp_live_publish 兩個(gè)函數(shù)。

    由 rtmp 模塊的排序,首先調(diào)用 ngx_rtmp_relay_publish。

    2.6.2 ngx_rtmp_relay_publish

    static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) {ngx_rtmp_relay_app_conf_t *racf;ngx_rtmp_relay_target_t *target, **t;ngx_str_t name;size_t n;ngx_rtmp_relay_ctx_t *ctx;if (s->auto_pushed) {goto next;}ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (ctx && s->relay) {goto next;}racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);if (racf == NULL || racf->pushes.nelts == 0) {goto next;}/* v->name 中保存的是從客戶端發(fā)送的 publish 命令消息中提取出的要發(fā)布的流名稱 */name.len = ngx_strlen(v->name);name.data = v->name;/* 從 pushes 數(shù)組中取出首元素,遍歷該數(shù)組 */t = racf->pushes.elts;for (n = 0; n < racf->pushes.nelts; ++n, ++t) {target = *t;/* 配置文件中是否指定了要推流的名稱,若是,則檢測(cè)指定的流名字與當(dāng)前接收到的publish 流名* 是否一致 */if (target->name.len && (name.len != target->name.len ||ngx_memcmp(name.data, target->name.data, name.len))){continue;}if (ngx_rtmp_relay_push(s, &name, target) == NGX_OK) {continue;}ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,"relay: push failed name=‘%V‘ app=‘%V‘ ""playpath=‘%V‘ url=‘%V‘",&name, &target->app, &target->play_path,&target->url.url);if (!ctx->push_evt.timer_set) {ngx_add_timer(&ctx->push_evt, racf->push_reconnect);}}next:return next_publish(s, v); }

    2.6.3 ngx_rtmp_relay_push

    ngx_int_t ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *name,ngx_rtmp_relay_target_t *target) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"relay: create push name=‘%V‘ app=‘%V‘ playpath=‘%V‘ url=‘%V‘",name, &target->app, &target->play_path, &target->url.url);return ngx_rtmp_relay_create(s, name, target,ngx_rtmp_relay_create_local_ctx,ngx_rtmp_relay_create_remote_ctx); }

    2.6.4 ngx_rtmp_relay_create

    static ngx_int_t ngx_rtmp_relay_create(ngx_rtmp_session_t *s, ngx_str_t *name,ngx_rtmp_relay_target_t *target,ngx_rtmp_relay_create_ctx_pt create_publish_ctx,ngx_rtmp_relay_create_ctx_pt create_play_ctx) {ngx_rtmp_relay_app_conf_t *racf;ngx_rtmp_relay_ctx_t *publish_ctx, *play_ctx, **cctx;ngx_uint_t hash;racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);if (racf == NULL) {return NGX_ERROR;}/* 該函數(shù)主要是創(chuàng)建一個(gè)新的連接,連接推流url中指定的地址,即將該地址作為上游服務(wù)器的地址,* 向該上游服務(wù)器發(fā)起連接 */play_ctx = create_play_ctx(s, name, target);if (play_ctx == NULL) {return NGX_ERROR;}hash = ngx_hash_key(name->data, name->len);cctx = &racf->ctx[hash % racf->nbuckets];for (; *cctx; cctx = &(*cctx)->next) {if ((*cctx)->name.len == name->len&& !ngx_memcmp(name->data, (*cctx)->name.data,name->len)){break;}}if (*cctx) {play_ctx->publish = (*cctx)->publish;play_ctx->next = (*cctx)->play;(*cctx)->play = play_ctx;return NGX_OK;}/* 創(chuàng)建一個(gè)本地 ngx_rtmp_relay_ctx_t */publish_ctx = create_publish_ctx(s, name, target);if (publish_ctx == NULL) {ngx_rtmp_finalize_session(play_ctx->session);return NGX_ERROR;}publish_ctx->publish = publish_ctx;publish_ctx->play = play_ctx;play_ctx->publish = publish_ctx;*cctx = publish_ctx;return NGX_OK; }

    2.6.4.1 ngx_rtmp_relay_create_remote_ctx

    static ngx_rtmp_relay_ctx_t * ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name,ngx_rtmp_relay_target_t *target) {ngx_rtmp_conf_ctx_t cctx;cctx.app_conf = s->app_conf;cctx.srv_conf = s->srv_conf;cctx.main_conf = s->main_conf;return ngx_rtmp_relay_create_connection(&cctx, name, target); }

    2.6.4.2 ngx_rtmp_relay_create_connection

    static ngx_rtmp_relay_ctx_t * ngx_rtmp_relay_create_connection(ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name,ngx_rtmp_relay_target_t *target) {ngx_rtmp_relay_app_conf_t *racf;ngx_rtmp_relay_ctx_t *rctx;ngx_rtmp_addr_conf_t *addr_conf;ngx_rtmp_conf_ctx_t *addr_ctx;ngx_rtmp_session_t *rs;ngx_peer_connection_t *pc;ngx_connection_t *c;ngx_addr_t *addr;ngx_pool_t *pool;ngx_int_t rc;ngx_str_t v, *uri;u_char *first, *last, *p;racf = ngx_rtmp_get_module_app_conf(cctx, ngx_rtmp_relay_module);ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,"relay: create remote context");pool = NULL;/* 分配一個(gè)內(nèi)存池 */pool = ngx_create_pool(4096, racf->log);if (pool == NULL) {return NULL;}/* 從內(nèi)存池中為 ngx_rtmp_relay_ctx_t 結(jié)構(gòu)體分配內(nèi)存 */rctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_relay_ctx_t));if (rctx == NULL) {goto clear;}/* 將發(fā)布的流名拷貝到新建的 ngx_rtmp_relay_ctx_t 中的 name 成員 */if (name && ngx_rtmp_relay_copy_str(pool, &rctx->name, name) != NGX_OK) {goto clear;}/* 將配置文件中配置的 push 推流地址,即 url 拷貝到新建的 ngx_rtmp_relay_ctx_t* 結(jié)構(gòu)體的 url 成員中 */if (ngx_rtmp_relay_copy_str(pool, &rctx->url, &target->url.url) != NGX_OK) {goto clear;}/* target->tag 指向 ngx_rtmp_relay_module 結(jié)構(gòu)體的首地址 */rctx->tag = target->tag;/* target->data 指向當(dāng)前 data 所屬的 ngx_rtmp_relay_ctx_t 結(jié)構(gòu)體的首地址 */rctx->data = target->data;#define NGX_RTMP_RELAY_STR_COPY(to, from) if (ngx_rtmp_relay_copy_str(pool, &rctx->to, &target->from) != NGX_OK) { goto clear; }/* 將以下 target 中的值拷貝到新建的 ngx_rtmp_relay_ctx_t 結(jié)構(gòu)體的相應(yīng)成員中 */NGX_RTMP_RELAY_STR_COPY(app, app);NGX_RTMP_RELAY_STR_COPY(tc_url, tc_url);NGX_RTMP_RELAY_STR_COPY(page_url, page_url);NGX_RTMP_RELAY_STR_COPY(swf_url, swf_url);NGX_RTMP_RELAY_STR_COPY(flash_ver, flash_ver);NGX_RTMP_RELAY_STR_COPY(play_path, play_path);rctx->live = target->live;rctx->start = target->start;rctx->stop = target->stop;#undef NGX_RTMP_RELAY_STR_COPY/* 若 app 的值未知 */if (rctx->app.len == 0 || rctx->play_path.len == 0) {/* 這里是從推流地址中提取出 app 的值,下面分析以 "push rtmp:192.168.1.82:1935/live;" * 為例,則提出的 live 將賦給 rctx->app *//* parse uri */uri = &target->url.uri;first = uri->data;last = uri->data + uri->len;if (first != last && *first == ‘/‘) {++first;}if (first != last) {/* deduce app */p = ngx_strlchr(first, last, ‘/‘);if (p == NULL) {p = last;}if (rctx->app.len == 0 && first != p) {/* 這里 v.data 指向 "live" */v.data = first;v.len = p - first;/* 將 "live" 賦給 rctx->app */if (ngx_rtmp_relay_copy_str(pool, &rctx->app, &v) != NGX_OK) {goto clear;}}/* deduce play_path */if (p != last) {++p;}/* 若播放路徑為 NULL 且 p 不等于 last(注,這里 p 不等于 last 意味著 * "push rtmp:192.168.1.82:1935/live;" 的 "live" 字符串后面還有數(shù)據(jù),* 但是,這里沒(méi)有)*/if (rctx->play_path.len == 0 && p != last) {v.data = p;v.len = last - p;if (ngx_rtmp_relay_copy_str(pool, &rctx->play_path, &v)!= NGX_OK){goto clear;}}}}/* 從內(nèi)存池中為主動(dòng)連接結(jié)構(gòu)體 ngx_peer_connection_t 分配內(nèi)存 */pc = ngx_pcalloc(pool, sizeof(ngx_peer_connection_t));if (pc == NULL) {goto clear;}if (target->url.naddrs == 0) {ngx_log_error(NGX_LOG_ERR, racf->log, 0,"relay: no address");goto clear;}/* get address *//* 獲取 推流地址 url 中指明的服務(wù)器地址(即推流的目標(biāo)地址)* 如"push rtmp:192.168.1.82:1935/live;" 中的 "192.168.1.82:1935" */addr = &target->url.addrs[target->counter % target->url.naddrs];target->counter++;/* copy log to keep shared log unchanged */rctx->log = *racf->log;pc->log = &rctx->log;/* 當(dāng)使用長(zhǎng)連接與上游服務(wù)器通信時(shí),可通過(guò)該方法由連接池中獲取一個(gè)新連接 */pc->get = ngx_rtmp_relay_get_peer;/* 當(dāng)使用長(zhǎng)連接與上游服務(wù)器通信時(shí),通過(guò)該方法將使用完畢的連接釋放給連接池 */pc->free = ngx_rtmp_relay_free_peer;/* 遠(yuǎn)端服務(wù)器的名稱,這里其實(shí)就是 "192.168.1.82:1935" 該串字符串 */pc->name = &addr->name;pc->socklen = addr->socklen;pc->sockaddr = (struct sockaddr *)ngx_palloc(pool, pc->socklen);if (pc->sockaddr == NULL) {goto clear;}/* 將 addr->sockaddr 中保存的遠(yuǎn)端服務(wù)器的地址信息拷貝到 pc->sockaddr 中 */ngx_memcpy(pc->sockaddr, addr->sockaddr, pc->socklen);/* 開(kāi)始連接上游服務(wù)器 */rc = ngx_event_connect_peer(pc);/* 由 ngx_event_connect_peer 源碼可知,因?yàn)?socket 套接字被設(shè)置為非阻塞,* 因?yàn)槭状?connect 必定失敗,因此該函數(shù)返回 NGX_AGAIN */if (rc != NGX_OK && rc != NGX_AGAIN ) {ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,"relay: connection failed");goto clear;}c = pc->connection;c->pool = pool;/* 推流 URL */c->addr_text = rctx->url;addr_conf = ngx_pcalloc(pool, sizeof(ngx_rtmp_addr_conf_t));if (addr_conf == NULL) {goto clear;}addr_ctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_conf_ctx_t));if (addr_ctx == NULL) {goto clear;}addr_conf->ctx = addr_ctx;addr_ctx->main_conf = cctx->main_conf;addr_ctx->srv_conf = cctx->srv_conf;ngx_str_set(&addr_conf->addr_text, "ngx-relay");/* 為該主動(dòng)連接初始化一個(gè)會(huì)話 */rs = ngx_rtmp_init_session(c, addr_conf);if (rs == NULL) {/* no need to destroy pool */return NULL;}rs->app_conf = cctx->app_conf;/* 置該標(biāo)志位為 1 */rs->relay = 1;rctx->session = rs;ngx_rtmp_set_ctx(rs, rctx, ngx_rtmp_relay_module);ngx_str_set(&rs->flashver, "ngx-local-relay");#if (NGX_STAT_STUB)(void) ngx_atomic_fetch_add(ngx_stat_active, 1); #endif/* 此時(shí)作為客戶端,開(kāi)始向上游服務(wù)器發(fā)說(shuō)送 hanshake 包,即 C0 + C1 */ngx_rtmp_client_handshake(rs, 1);return rctx;clear:if (pool) {ngx_destroy_pool(pool);}return NULL; }

    2.6.4.3 ngx_event_connect_peer

    ngx_int_t ngx_event_connect_peer(ngx_peer_connection_t *pc) {int rc, type; #if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT || NGX_LINUX)in_port_t port; #endifngx_int_t event;ngx_err_t err;ngx_uint_t level;ngx_socket_t s;ngx_event_t *rev, *wev;ngx_connection_t *c;/* 該 get 方法其實(shí)沒(méi)有做任何處理 */rc = pc->get(pc, pc->data);if (rc != NGX_OK) {return rc;}type = (pc->type ? pc->type : SOCK_STREAM);/* 創(chuàng)建一個(gè) socket 套接字 */s = ngx_socket(pc->sockaddr->sa_family, type, 0);ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, "%s socket %d",(type == SOCK_STREAM) ? "stream" : "dgram", s);if (s == (ngx_socket_t) -1) {ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,ngx_socket_n " failed");return NGX_ERROR;}/* 從連接池中獲取一個(gè)空閑連接 */c = ngx_get_connection(s, pc->log);if (c == NULL) {if (ngx_close_socket(s) == -1) {ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,ngx_close_socket_n "failed");}return NGX_ERROR;}/* 當(dāng)前 socket 的類型,是 STREAM 還是 DGRAM,這里為 STREAM */c->type = type;/* 若設(shè)置了接收緩沖區(qū)的大小,從上面知沒(méi)有設(shè)置 */if (pc->rcvbuf) {if (setsockopt(s, SOL_SOCKET, SO_RCVBUF,(const void *) &pc->rcvbuf, sizeof(int)) == -1){ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,"setsockopt(SO_RCVBUF) failed");goto failed;}}/* 將該 socket 套接字設(shè)置為非阻塞 */if (ngx_nonblocking(s) == -1) {ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,ngx_nonblocking_n " failed");goto failed;}/* local 保存的是本地地址信息,則上面可知,沒(méi)有設(shè)置 */if (pc->local) {#if (NGX_HAVE_TRANSPARENT_PROXY)if (pc->transparent) {if (ngx_event_connect_set_transparent(pc, s) != NGX_OK) {goto failed;}} #endif#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT || NGX_LINUX)port = ngx_inet_get_port(pc->local->sockaddr); #endif#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT)if (pc->sockaddr->sa_family != AF_UNIX && port == 0) {static int bind_address_no_port = 1;if (bind_address_no_port) {if (setsockopt(s, IPPROTO_IP, IP_BIND_ADDRESS_NO_PORT,(const void *) &bind_address_no_port,sizeof(int)) == -1){err = ngx_socket_errno;if (err != NGX_EOPNOTSUPP && err != NGX_ENOPROTOOPT) {ngx_log_error(NGX_LOG_ALERT, pc->log, err,"setsockopt(IP_BIND_ADDRESS_NO_PORT) ""failed, ignored");} else {bind_address_no_port = 0;}}}}#endif#if (NGX_LINUX)if (pc->type == SOCK_DGRAM && port != 0) {int reuse_addr = 1;if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,(const void *) &reuse_addr, sizeof(int))== -1){ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,"setsockopt(SO_REUSEADDR) failed");goto failed;}}#endifif (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) {ngx_log_error(NGX_LOG_CRIT, pc->log, ngx_socket_errno,"bind(%V) failed", &pc->local->name);goto failed;}}if (type == SOCK_STREAM) {/* 設(shè)置當(dāng)前連接的 IO 回調(diào)函數(shù) */c->recv = ngx_recv;c->send = ngx_send;c->recv_chain = ngx_recv_chain;c->send_chain = ngx_send_chain;/* 使用 sendfile */c->sendfile = 1;if (pc->sockaddr->sa_family == AF_UNIX) {c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;#if (NGX_SOLARIS)/* Solaris‘s sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */c->sendfile = 0; #endif}} else { /* type == SOCK_DGRAM */c->recv = ngx_udp_recv;c->send = ngx_send;c->send_chain = ngx_udp_send_chain;}c->log_error = pc->log_error;/* 設(shè)置當(dāng)前主動(dòng)連接讀寫事件的回調(diào)函數(shù) */rev = c->read;wev = c->write;rev->log = pc->log;wev->log = pc->log;pc->connection = c;c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);/* 將該主動(dòng)連接的讀寫事件添加到 epoll 等事件監(jiān)控機(jī)制中 */if (ngx_add_conn) {if (ngx_add_conn(c) == NGX_ERROR) {goto failed;}}ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,"connect to %V, fd:%d #%uA", pc->name, s, c->number);/* 連接該上游服務(wù)器,因?yàn)樵?socket 套接字被設(shè)置為非阻塞,因此首次connect返回 -1,即失敗 */rc = connect(s, pc->sockaddr, pc->socklen);if (rc == -1) {err = ngx_socket_errno;if (err != NGX_EINPROGRESS #if (NGX_WIN32)/* Winsock returns WSAEWOULDBLOCK (NGX_EAGAIN) */&& err != NGX_EAGAIN #endif){if (err == NGX_ECONNREFUSED #if (NGX_LINUX)/** Linux returns EAGAIN instead of ECONNREFUSED* for unix sockets if listen queue is full*/|| err == NGX_EAGAIN #endif|| err == NGX_ECONNRESET|| err == NGX_ENETDOWN|| err == NGX_ENETUNREACH|| err == NGX_EHOSTDOWN|| err == NGX_EHOSTUNREACH){level = NGX_LOG_ERR;} else {level = NGX_LOG_CRIT;}ngx_log_error(level, c->log, err, "connect() to %V failed",pc->name);ngx_close_connection(c);pc->connection = NULL;return NGX_DECLINED;}}/* 因此,從這里返回 NGX_AGAIN */if (ngx_add_conn) {if (rc == -1) {/* NGX_EINPROGRESS */return NGX_AGAIN;}ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");wev->ready = 1;return NGX_OK;}if (ngx_event_flags & NGX_USE_IOCP_EVENT) {ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, ngx_socket_errno,"connect(): %d", rc);if (ngx_blocking(s) == -1) {ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,ngx_blocking_n " failed");goto failed;}/** FreeBSD‘s aio allows to post an operation on non-connected socket.* NT does not support it.** TODO: check in Win32, etc. As workaround we can use NGX_ONESHOT_EVENT*/rev->ready = 1;wev->ready = 1;return NGX_OK;}if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {/* kqueue */event = NGX_CLEAR_EVENT;} else {/* select, poll, /dev/poll */event = NGX_LEVEL_EVENT;}if (ngx_add_event(rev, NGX_READ_EVENT, event) != NGX_OK) {goto failed;}if (rc == -1) {/* NGX_EINPROGRESS */if (ngx_add_event(wev, NGX_WRITE_EVENT, event) != NGX_OK) {goto failed;}return NGX_AGAIN;}ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");wev->ready = 1;return NGX_OK;failed:ngx_close_connection(c);pc->connection = NULL;return NGX_ERROR; }

    2.6.4.4 ngx_rtmp_client_handshake

    void ngx_rtmp_client_handshake(ngx_rtmp_session_t *s, unsigned async) {ngx_connection_t *c;c = s->connection;/* 設(shè)置當(dāng)前連接讀寫事件的回調(diào)函數(shù) */c->read->handler = ngx_rtmp_handshake_recv;c->write->handler = ngx_rtmp_handshake_send;ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"handshake: start client handshake");/* 為該將要進(jìn)行的 hanshake 過(guò)程分配數(shù)據(jù)緩存,用于存儲(chǔ)接收/響應(yīng)的 hanshake 包 */s->hs_buf = ngx_rtmp_alloc_handshake_buffer(s);/* 設(shè)置當(dāng)前 hanshake 階段,即為 client send: C0 + C1 */s->hs_stage = NGX_RTMP_HANDSHAKE_CLIENT_SEND_CHALLENGE;/* 構(gòu)建 C0 + C1 的 數(shù)據(jù)包 */if (ngx_rtmp_handshake_create_challenge(s,ngx_rtmp_client_version,&ngx_rtmp_client_partial_key) != NGX_OK){ngx_rtmp_finalize_session(s);return;}/* 有前面的調(diào)用傳入的參數(shù)可知,該值為 1,即為異步,因此這里暫時(shí)不向上游服務(wù)器發(fā)送 handshake,* 而是將其寫事件添加到定時(shí)器和 epoll 中,等待下次循環(huán)監(jiān)控到該寫事件可寫時(shí)才發(fā)送 C0 + C1 */if (async) {/* 將該寫事件添加到定時(shí)器中,超時(shí)時(shí)間為 s->timeout */ngx_add_timer(c->write, s->timeout);/* 將該寫事件添加到 epoll 等事件監(jiān)控機(jī)制中 */if (ngx_handle_write_event(c->write, 0) != NGX_OK) {ngx_rtmp_finalize_session(s);}return;}ngx_rtmp_handshake_send(c->write); }

    2.6.4.5 ngx_rtmp_relay_create_local_ctx

    static ngx_rtmp_relay_ctx_t * ngx_rtmp_relay_create_local_ctx(ngx_rtmp_session_t *s, ngx_str_t *name,ngx_rtmp_relay_target_t *target) {ngx_rtmp_relay_ctx_t *ctx;ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"relay: create local context");ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (ctx == NULL) {ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_relay_ctx_t));if (ctx == NULL) {return NULL;}ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_relay_module);}ctx->session = s;ctx->push_evt.data = s;ctx->push_evt.log = s->connection->log;/* 設(shè)置該 push_evt 事件的回調(diào)函數(shù) */ctx->push_evt.handler = ngx_rtmp_relay_push_reconnect;if (ctx->publish) {return NULL;}if (ngx_rtmp_relay_copy_str(s->connection->pool, &ctx->name, name)!= NGX_OK){return NULL;}return ctx; }

    從 ngx_rtmp_relay_create_local_ctx 函數(shù)返回后,就一直返回到 ngx_rtmp_relay_publish 函數(shù)中,接著執(zhí)行 next_publish 的下
    一個(gè)函數(shù)。這里為 ngx_rtmp_live_publish。

    2.6.5 ngx_rtmp_live_publish

    static ngx_int_t ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) {ngx_rtmp_live_app_conf_t *lacf;ngx_rtmp_live_ctx_t *ctx;lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);if (lacf == NULL || !lacf->live) {goto next;}ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"live: publish: name=‘%s‘ type=‘%s‘",v->name, v->type);/* join stream as publisher *//* 構(gòu)建一個(gè) ngx_rtmp_live_ctx_t 結(jié)構(gòu)體作為發(fā)布者 */ngx_rtmp_live_join(s, v->name, 1);/* 這里獲取到的就是上面構(gòu)建的 ngx_rtmp_live_ctx_t 結(jié)構(gòu)體 */ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);if (ctx == NULL || !ctx->publishing) {goto next;}ctx->silent = v->silent;if (!ctx->silent) {/* 對(duì)之前客戶端發(fā)送的 publish 返回一個(gè)響應(yīng) */ngx_rtmp_send_status(s, "NetStream.Publish.Start","status", "Start publishing");}next:return next_publish(s, v); }

    send: onStatus(‘NetStream.Publish.Start‘) 圖(12)

    之后又回到 epoll_wait 處,等待監(jiān)聽(tīng)的事件觸發(fā)。接下來(lái)的分析先看 nginx 的一段打印:

    ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59761 ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:9 ev:0004 d:088F6950 ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 9: 958705070 ngx_send.c:ngx_unix_send:37 send: fd:9 1537 of 1537 ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:9 op:3 ev:00002001 ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 7 ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:0 ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 9: 60000:958705071 ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:5 ev:0001 d:088F67E8 ngx_event_accept.c:ngx_event_accept:58 accept on 0.0.0.0:1935, ready: 0 ngx_alloc.c:ngx_memalign:66 posix_memalign: 08930870:4096 @16 ngx_event_accept.c:ngx_event_accept:293 *3 accept: 192.168.1.82:39334 fd:10 ngx_rtmp_init.c:ngx_rtmp_init_connection:124 *3 client connected ‘192.168.1.82‘ ngx_rtmp_handler.c:ngx_rtmp_set_chunk_size:823 setting chunk_size=128 ngx_alloc.c:ngx_memalign:66 posix_memalign: 089318A0:4096 @16 ngx_rtmp_limit_module.c:ngx_rtmp_limit_connect:87 rtmp limit: connect ngx_rtmp_handshake.c:ngx_rtmp_handshake:589 handshake: start server handshake ngx_rtmp_handshake.c:ngx_rtmp_alloc_handshake_buffer:208 handshake: allocating buffer ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:0 ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 10: 60000:958705071 ngx_epoll_module.c:ngx_epoll_add_event:625 epoll add event: fd:10 op:1 ev:80002001 ngx_event.c:ngx_process_events_and_timers:247 timer delta: 1 ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59760 ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:10 ev:0001 d:088F69C8 ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 10: 958705071 ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1 ngx_recv.c:ngx_unix_recv:72 recv: fd:10 1537 of 1537 ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:10 op:2 ev:00000000 ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 2 ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:303 handshake: peer version=14.13.0.12 epoch=958645070 ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:320 handshake: digest found at pos=638 ngx_send.c:ngx_unix_send:37 send: fd:10 1537 of 1537 ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 3 ngx_send.c:ngx_unix_send:37 send: fd:10 1536 of 1536 ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 4 ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1 ngx_recv.c:ngx_unix_recv:72 recv: fd:10 -1 of 1536 ngx_recv.c:ngx_unix_recv:150 recv() not ready (11: Resource temporarily unavailable) ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 10: 60000:958705071 ngx_epoll_module.c:ngx_epoll_add_event:625 epoll add event: fd:10 op:1 ev:80002001 ngx_event.c:ngx_process_events_and_timers:247 timer delta: 0 ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59760 ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:9 ev:0001 d:088F6950 ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 9: 958705071 ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1 ngx_recv.c:ngx_unix_recv:72 recv: fd:9 1537 of 1537 ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:9 op:2 ev:00000000 ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 8 ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:303 handshake: peer version=13.10.14.13 epoch=958645071 ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:320 handshake: digest found at pos=557 ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1 ngx_recv.c:ngx_unix_recv:72 recv: fd:9 1536 of 1536 ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 9 ngx_send.c:ngx_unix_send:37 send: fd:9 1536 of 1536 ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 10 ngx_rtmp_handshake.c:ngx_rtmp_handshake_done:362 handshake: done ngx_rtmp_relay_module.c:ngx_rtmp_relay_handshake_done:1319 rtmp relay module: handhshake done

    首先 fd = 9 為連接上游服務(wù)器(192.168.1.82:1935) 時(shí)創(chuàng)建的作為客戶端的 STREAM 類型的 socket 套接字,而 fd = 5 為 nginx
    啟動(dòng)時(shí)創(chuàng)建的 STREAM 類型的 socket 監(jiān)聽(tīng)套接字。因此,從打印中可以看出,上面的打印是這么一個(gè)流程:

  • epoll 監(jiān)聽(tīng)的 fd 為 9 的套接字可寫,因此調(diào)用該套接字上寫事件的回調(diào)函數(shù),從之前的源碼可知,為
    ngx_rtmp_handshake_send 函數(shù),該函數(shù)將已經(jīng)準(zhǔn)備好的 C0 和 C1 通過(guò)該寫事件對(duì)應(yīng)的 send 函數(shù),即
    ngx_unix_send 函數(shù)發(fā)送給上游服務(wù)器(192.168.1.82:1935);發(fā)送完后進(jìn)入 CLIENT_RECV_CHALLENGE(7) 階段,
    該階段為等待接收服務(wù)器 S0 和 S1 的階段;
  • epool 監(jiān)控到服務(wù)器 fd:5 有數(shù)據(jù)可讀,且為新連接,因此調(diào)用 ngx_event_accept 接收該客戶端(192.168.1.82:39334)的
    連接,接受連接后服務(wù)器使用 fd:10 與客戶端進(jìn)行交互,接著服務(wù)器開(kāi)始進(jìn)入 handshake 階段;
  • 下面就開(kāi)始了服務(wù)器 (192.168.1.82:1935, fd = 10) 和 客戶端(192.168.1.82:39334, fd = 9) 的 hanshake 過(guò)程,就不再詳
    述,和之前分析的 hanshake 一樣。
  • 客戶端發(fā)送 C2 后,會(huì)進(jìn)入 NGX_RTMP_HANDSHAKE_CLIENT_DONE(10) 階段,接著會(huì)調(diào)用該函數(shù) ngx_rtmp_handshake_done:

    static void ngx_rtmp_handshake_done(ngx_rtmp_session_t *s) {ngx_rtmp_free_handshake_buffers(s);ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"handshake: done");if (ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE,NULL, NULL) != NGX_OK){ngx_rtmp_finalize_session(s);return;}ngx_rtmp_cycle(s); }

    該函數(shù)接著會(huì)調(diào)用到 ngx_rtmp_relay_handshake_done 函數(shù):

    static ngx_int_t ngx_rtmp_relay_handshake_done(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: handhshake done");ngx_rtmp_relay_ctx_t *ctx;ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (ctx == NULL || !s->relay) {ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: return");return NGX_OK;}/* 主要是向服務(wù)器發(fā)送 connect 連接命令 */return ngx_rtmp_relay_send_connect(s); }

    2.7 客戶端(fd = 9)發(fā)送:connect

    客戶端(192.168.1.82:39334, fd = 9) hanshake 成功后會(huì)向服務(wù)器發(fā)送 connec 連接命令。

    2.7.1 ngx_rtmp_relay_send_connect

    static ngx_int_t ngx_rtmp_relay_send_connect(ngx_rtmp_session_t *s) {ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: send connect");static double trans = NGX_RTMP_RELAY_CONNECT_TRANS;static double acodecs = 3575;static double vcodecs = 252;static ngx_rtmp_amf_elt_t out_cmd[] = {{ NGX_RTMP_AMF_STRING,ngx_string("app"),NULL, 0 }, /* <-- fill */{ NGX_RTMP_AMF_STRING,ngx_string("tcUrl"),NULL, 0 }, /* <-- fill */{ NGX_RTMP_AMF_STRING,ngx_string("pageUrl"),NULL, 0 }, /* <-- fill */{ NGX_RTMP_AMF_STRING,ngx_string("swfUrl"),NULL, 0 }, /* <-- fill */{ NGX_RTMP_AMF_STRING,ngx_string("flashVer"),NULL, 0 }, /* <-- fill */{ NGX_RTMP_AMF_NUMBER,ngx_string("audioCodecs"),&acodecs, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("videoCodecs"),&vcodecs, 0 }};static ngx_rtmp_amf_elt_t out_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,"connect", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&trans, 0 },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,out_cmd, sizeof(out_cmd) }};ngx_rtmp_core_app_conf_t *cacf;ngx_rtmp_core_srv_conf_t *cscf;ngx_rtmp_relay_ctx_t *ctx;ngx_rtmp_header_t h;size_t len, url_len;u_char *p, *url_end;cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_core_module);cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (cacf == NULL || ctx == NULL) {return NGX_ERROR;}/* app */if (ctx->app.len) {out_cmd[0].data = ctx->app.data;out_cmd[0].len = ctx->app.len;} else {out_cmd[0].data = cacf->name.data;out_cmd[0].len = cacf->name.len;}/* tcUrl */if (ctx->tc_url.len) {out_cmd[1].data = ctx->tc_url.data;out_cmd[1].len = ctx->tc_url.len;} else {len = sizeof("rtmp://") - 1 + ctx->url.len +sizeof("/") - 1 + ctx->app.len;p = ngx_palloc(s->connection->pool, len);if (p == NULL) {return NGX_ERROR;}out_cmd[1].data = p;p = ngx_cpymem(p, "rtmp://", sizeof("rtmp://") - 1);url_len = ctx->url.len;url_end = ngx_strlchr(ctx->url.data, ctx->url.data + ctx->url.len, ‘/‘);if (url_end) {url_len = (size_t) (url_end - ctx->url.data);}p = ngx_cpymem(p, ctx->url.data, url_len);*p++ = ‘/‘;p = ngx_cpymem(p, ctx->app.data, ctx->app.len);out_cmd[1].len = p - (u_char *)out_cmd[1].data;}/* pageUrl */out_cmd[2].data = ctx->page_url.data;out_cmd[2].len = ctx->page_url.len;/* swfUrl */out_cmd[3].data = ctx->swf_url.data;out_cmd[3].len = ctx->swf_url.len;/* flashVer */if (ctx->flash_ver.len) {out_cmd[4].data = ctx->flash_ver.data;out_cmd[4].len = ctx->flash_ver.len;} else {out_cmd[4].data = NGX_RTMP_RELAY_FLASHVER;out_cmd[4].len = sizeof(NGX_RTMP_RELAY_FLASHVER) - 1;}ngx_memzero(&h, sizeof(h));h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;h.type = NGX_RTMP_MSG_AMF_CMD;return ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK|| ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK|| ngx_rtmp_send_amf(s, &h, out_elts,sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK? NGX_ERROR: NGX_OK; }

    發(fā)送完這幾個(gè) RTMP 包,后,又回到 epoll_wait 中進(jìn)行監(jiān)聽(tīng)。

    下面的分析區(qū)分一個(gè)服務(wù)器,兩個(gè)客戶端:

    • 服務(wù)器:192.168.1.82:1935
    • 客戶端:obs 推流
    • 客戶端:192.168.1.82:xxxx

    2.8 服務(wù)器 接收 客戶端 obs: amf_meta(18)

    此時(shí),監(jiān)聽(tīng)到 obs 客戶端發(fā)送的類型為 amf_meta(18) 的 rtmp 消息。

    receive: @setDataFrame(meta_data 18) 圖(13)


    對(duì)于?"@setDataFrame",僅有 ngx_rtmp_codec_module 模塊對(duì)其設(shè)置了會(huì)調(diào)函數(shù),為 ngx_rtmp_codec_meta_data 函數(shù):

    2.8.1 ngx_rtmp_codec_meta_data

    static ngx_int_t ngx_rtmp_codec_meta_data(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {ngx_rtmp_codec_app_conf_t *cacf;ngx_rtmp_codec_ctx_t *ctx;ngx_uint_t skip;static struct {double width;double height;double duration;double frame_rate;double video_data_rate;double video_codec_id_n;u_char video_codec_id_s[32];double audio_data_rate;double audio_codec_id_n;u_char audio_codec_id_s[32];u_char profile[32];u_char level[32];} v;static ngx_rtmp_amf_elt_t in_video_codec_id[] = {{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&v.video_codec_id_n, 0 },{ NGX_RTMP_AMF_STRING,ngx_null_string,&v.video_codec_id_s, sizeof(v.video_codec_id_s) },};static ngx_rtmp_amf_elt_t in_audio_codec_id[] = {{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&v.audio_codec_id_n, 0 },{ NGX_RTMP_AMF_STRING,ngx_null_string,&v.audio_codec_id_s, sizeof(v.audio_codec_id_s) },};static ngx_rtmp_amf_elt_t in_inf[] = {{ NGX_RTMP_AMF_NUMBER,ngx_string("width"),&v.width, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("height"),&v.height, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("duration"),&v.duration, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("framerate"),&v.frame_rate, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("fps"),&v.frame_rate, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("videodatarate"),&v.video_data_rate, 0 },{ NGX_RTMP_AMF_VARIANT,ngx_string("videocodecid"),in_video_codec_id, sizeof(in_video_codec_id) },{ NGX_RTMP_AMF_NUMBER,ngx_string("audiodatarate"),&v.audio_data_rate, 0 },{ NGX_RTMP_AMF_VARIANT,ngx_string("audiocodecid"),in_audio_codec_id, sizeof(in_audio_codec_id) },{ NGX_RTMP_AMF_STRING,ngx_string("profile"),&v.profile, sizeof(v.profile) },{ NGX_RTMP_AMF_STRING,ngx_string("level"),&v.level, sizeof(v.level) },};static ngx_rtmp_amf_elt_t in_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,in_inf, sizeof(in_inf) },};cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_codec_module);ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);if (ctx == NULL) {ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_codec_ctx_t));ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_codec_module);}ngx_memzero(&v, sizeof(v));/* use -1 as a sign of unchanged data;* 0 is a valid value for uncompressed audio */v.audio_codec_id_n = -1;/* FFmpeg sends a string in front of actal metadata; ignore it */skip = !(in->buf->last > in->buf->pos&& *in->buf->pos == NGX_RTMP_AMF_STRING);if (ngx_rtmp_receive_amf(s, in, in_elts + skip,sizeof(in_elts) / sizeof(in_elts[0]) - skip)){ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,"codec: error parsing data frame");return NGX_OK;}ctx->width = (ngx_uint_t) v.width;ctx->height = (ngx_uint_t) v.height;ctx->duration = (ngx_uint_t) v.duration;ctx->frame_rate = (ngx_uint_t) v.frame_rate;ctx->video_data_rate = (ngx_uint_t) v.video_data_rate;ctx->video_codec_id = (ngx_uint_t) v.video_codec_id_n;ctx->audio_data_rate = (ngx_uint_t) v.audio_data_rate;ctx->audio_codec_id = (v.audio_codec_id_n == -1? 0 : v.audio_codec_id_n == 0? NGX_RTMP_AUDIO_UNCOMPRESSED : (ngx_uint_t) v.audio_codec_id_n);ngx_memcpy(ctx->profile, v.profile, sizeof(v.profile));ngx_memcpy(ctx->level, v.level, sizeof(v.level));ngx_log_debug8(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"codec: data frame: ""width=%ui height=%ui duration=%ui frame_rate=%ui ""video=%s (%ui) audio=%s (%ui)",ctx->width, ctx->height, ctx->duration, ctx->frame_rate,ngx_rtmp_get_video_codec_name(ctx->video_codec_id),ctx->video_codec_id,ngx_rtmp_get_audio_codec_name(ctx->audio_codec_id),ctx->audio_codec_id);switch (cacf->meta) {case NGX_RTMP_CODEC_META_ON: // 初始化為該值return ngx_rtmp_codec_reconstruct_meta(s);case NGX_RTMP_CODEC_META_COPY:return ngx_rtmp_codec_copy_meta(s, h, in);}/* NGX_RTMP_CODEC_META_OFF */return NGX_OK; }

    該函數(shù)主要是解析 setDataFrame 的數(shù)據(jù),然后調(diào)用 ngx_rtmp_codec_reconstruct_meta 函數(shù)。

    2.8.2 ngx_rtmp_codec_reconstruct_meta

    static ngx_int_t ngx_rtmp_codec_reconstruct_meta(ngx_rtmp_session_t *s) {ngx_rtmp_codec_ctx_t *ctx;ngx_rtmp_core_srv_conf_t *cscf;ngx_int_t rc;static struct {double width;double height;double duration;double frame_rate;double video_data_rate;double video_codec_id;double audio_data_rate;double audio_codec_id;u_char profile[32];u_char level[32];} v;static ngx_rtmp_amf_elt_t out_inf[] = {{ NGX_RTMP_AMF_STRING,ngx_string("Server"),"NGINX RTMP (github.com/arut/nginx-rtmp-module)", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("width"),&v.width, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("height"),&v.height, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("displayWidth"),&v.width, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("displayHeight"),&v.height, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("duration"),&v.duration, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("framerate"),&v.frame_rate, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("fps"),&v.frame_rate, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("videodatarate"),&v.video_data_rate, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("videocodecid"),&v.video_codec_id, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("audiodatarate"),&v.audio_data_rate, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("audiocodecid"),&v.audio_codec_id, 0 },{ NGX_RTMP_AMF_STRING,ngx_string("profile"),&v.profile, sizeof(v.profile) },{ NGX_RTMP_AMF_STRING,ngx_string("level"),&v.level, sizeof(v.level) },};static ngx_rtmp_amf_elt_t out_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,"onMetaData", 0 },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,out_inf, sizeof(out_inf) },};ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);if (ctx == NULL) {return NGX_OK;}cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);if (ctx->meta) {ngx_rtmp_free_shared_chain(cscf, ctx->meta);ctx->meta = NULL;}v.width = ctx->width;v.height = ctx->height;v.duration = ctx->duration;v.frame_rate = ctx->frame_rate;v.video_data_rate = ctx->video_data_rate;v.video_codec_id = ctx->video_codec_id;v.audio_data_rate = ctx->audio_data_rate;v.audio_codec_id = ctx->audio_codec_id;ngx_memcpy(v.profile, ctx->profile, sizeof(ctx->profile));ngx_memcpy(v.level, ctx->level, sizeof(ctx->level));rc = ngx_rtmp_append_amf(s, &ctx->meta, NULL, out_elts,sizeof(out_elts) / sizeof(out_elts[0]));if (rc != NGX_OK || ctx->meta == NULL) {return NGX_ERROR;}return ngx_rtmp_codec_prepare_meta(s, 0); }

    2.8.3 ngx_rtmp_codec_prepare_meta

    static ngx_int_t ngx_rtmp_codec_prepare_meta(ngx_rtmp_session_t *s, uint32_t timestamp) {ngx_rtmp_header_t h;ngx_rtmp_codec_ctx_t *ctx;ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);ngx_memzero(&h, sizeof(h));h.csid = NGX_RTMP_CSID_AMF;h.msid = NGX_RTMP_MSID;h.type = NGX_RTMP_MSG_AMF_META;h.timestamp = timestamp;/* 構(gòu)造完整的 rtmp 消息 */ngx_rtmp_prepare_message(s, &h, NULL, ctx->meta);ctx->meta_version = ngx_rtmp_codec_get_next_version();return NGX_OK; }

    2.9 服務(wù)器 接收 客戶端(192.168.1.82:xxx):chunk_size(1)

    服務(wù)器接收到客戶端發(fā)送的設(shè)置塊大小消息。此時(shí)服務(wù)器會(huì)調(diào)用到 ngx_rtmp_set_chunk_size 函數(shù)進(jìn)行塊大小的設(shè)置。

    2.9.1 ngx_rtmp_set_chunk_size

    ngx_int_t ngx_rtmp_set_chunk_size(ngx_rtmp_session_t *s, ngx_uint_t size) {ngx_rtmp_core_srv_conf_t *cscf;ngx_chain_t *li, *fli, *lo, *flo;ngx_buf_t *bi, *bo;ngx_int_t n;ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"setting chunk_size=%ui", size);if (size > NGX_RTMP_MAX_CHUNK_SIZE) {ngx_log_error(NGX_LOG_ALERT, s->connection->log, 0,"too big RTMP chunk size:%ui", size);return NGX_ERROR;}cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);s->in_old_pool = s->in_pool;s->in_chunk_size = size;s->in_pool = ngx_create_pool(4096, s->connection->log);/* copy existing chunk data */if (s->in_old_pool) {s->in_chunk_size_changing = 1;s->in_streams[0].in = NULL;for(n = 1; n < cscf->max_streams; ++n) {/* stream buffer is circular* for all streams except for the current one* (which caused this chunk size change);* we can simply ignore it */li = s->in_streams[n].in;if (li == NULL || li->next == NULL) {s->in_streams[n].in = NULL;continue;}/* move from last to the first */li = li->next;fli = li;lo = ngx_rtmp_alloc_in_buf(s);if (lo == NULL) {return NGX_ERROR;}flo = lo;for ( ;; ) {bi = li->buf;bo = lo->buf;if (bo->end - bo->last >= bi->last - bi->pos) {bo->last = ngx_cpymem(bo->last, bi->pos,bi->last - bi->pos);li = li->next;if (li == fli) {lo->next = flo;s->in_streams[n].in = lo;break;}continue;}bi->pos += (ngx_cpymem(bo->last, bi->pos,bo->end - bo->last) - bo->last);lo->next = ngx_rtmp_alloc_in_buf(s);lo = lo->next;if (lo == NULL) {return NGX_ERROR;}}}}return NGX_OK; }

    2.10 服務(wù)器 接收 客戶端(192.168.1.82:xxx):ack_size(5)

    服務(wù)器接收到客戶端發(fā)送的設(shè)置應(yīng)答窗口大小的消息。

    2.10.1 ngx_rtmp_protocol_message_handler

    ngx_int_t ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,ngx_rtmp_header_t *h, ngx_chain_t *in) {...switch(h->type) {...case NGX_RTMP_MSG_ACK_SIZE:/* receive window size =val */ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"receive ack_size=%uD", val);/* 直接設(shè)置應(yīng)答窗口大小 */s->ack_size = val;break;...} }

    2.11 服務(wù)器 接收 客戶端(192.168.1.82:xxx): amf_cmd(20) 之 connect

    服務(wù)器接收到客戶端發(fā)送的 connect 連接命令。該客戶端要連接的 app 為 live。
    抓不到包,只能看打印:

    AMF read (1) 00 ‘?‘ AMF read (8) 3F F0 00 00 00 00 00 00 ‘????????‘ AMF read (1) 03 ‘?‘ AMF read (2) 00 03 ‘??‘ AMF read (3) 61 70 70 ‘a(chǎn)pp‘ AMF read (1) 02 ‘?‘ AMF read (2) 00 04 ‘??‘ AMF read (4) 6C 69 76 65 ‘live‘ AMF read (2) 00 05 ‘??‘ AMF read (5) 74 63 55 72 6C ‘tcUrl‘ AMF read (1) 02 ‘?‘ AMF read (2) 00 1D ‘??‘ AMF read (29) 72 74 6D 70 3A 2F 2F 31 39 32 2E 31 36 38 2E 31 ‘rtmp://192.168.1‘ AMF read (2) 00 07 ‘??‘ AMF read (7) 70 61 67 65 55 72 6C ‘pageUrl‘ AMF read (1) 02 ‘?‘ AMF read (2) 00 00 ‘??‘ AMF read (2) 00 06 ‘??‘ AMF read (6) 73 77 66 55 72 6C ‘swfUrl‘ AMF read (1) 02 ‘?‘ AMF read (2) 00 00 ‘??‘ AMF read (2) 00 08 ‘??‘ AMF read (8) 66 6C 61 73 68 56 65 72 ‘flashVer‘ AMF read (1) 02 ‘?‘ AMF read (2) 00 0F ‘??‘ AMF read (15) 4C 4E 58 2E 31 31 2C 31 2C 31 30 32 2C 35 35 ‘LNX.11,1,102,55‘ AMF read (2) 00 0B ‘??‘ AMF read (11) 61 75 64 69 6F 43 6F 64 65 63 73 ‘a(chǎn)udioCodecs‘ AMF read (1) 00 ‘?‘ AMF read (8) 40 AB EE 00 00 00 00 00 ‘@???????‘ AMF read (2) 00 0B ‘??‘ AMF read (11) 76 69 64 65 6F 43 6F 64 65 63 73 ‘videoCodecs‘ AMF read (1) 00 ‘?‘ AMF read (8) 40 6F 80 00 00 00 00 00 ‘@o??????‘ AMF read (2) 00 00 ‘??‘ AMF read (1) 09 ‘?‘

    2.11.1 ngx_rtmp_cmd_connect

    static ngx_int_t ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"rtmp cmd: connect");ngx_rtmp_core_srv_conf_t *cscf;ngx_rtmp_core_app_conf_t **cacfp;ngx_uint_t n;ngx_rtmp_header_t h;u_char *p;static double trans;static double capabilities = NGX_RTMP_CAPABILITIES;static double object_encoding = 0;/* 以下內(nèi)容為服務(wù)器將要對(duì)客戶端的 connect 命令返回的 amf 類型的響應(yīng) */static ngx_rtmp_amf_elt_t out_obj[] = {{ NGX_RTMP_AMF_STRING,ngx_string("fmsVer"),NGX_RTMP_FMS_VERSION, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("capabilities"),&capabilities, 0 },};static ngx_rtmp_amf_elt_t out_inf[] = {{ NGX_RTMP_AMF_STRING,ngx_string("level"),"status", 0 },{ NGX_RTMP_AMF_STRING,ngx_string("code"),"NetConnection.Connect.Success", 0 },{ NGX_RTMP_AMF_STRING,ngx_string("description"),"Connection succeeded.", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_string("objectEncoding"),&object_encoding, 0 }};static ngx_rtmp_amf_elt_t out_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,"_result", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&trans, 0 },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,out_obj, sizeof(out_obj) },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,out_inf, sizeof(out_inf) },};if (s->connected) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"connect: duplicate connection");return NGX_ERROR;}cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);trans = v->trans;/* fill session parameters */s->connected = 1;ngx_memzero(&h, sizeof(h));h.csid = NGX_RTMP_CSID_AMF_INI;h.type = NGX_RTMP_MSG_AMF_CMD;#define NGX_RTMP_SET_STRPAR(name) s->name.len = ngx_strlen(v->name); s->name.data = ngx_palloc(s->connection->pool, s->name.len); ngx_memcpy(s->name.data, v->name, s->name.len)NGX_RTMP_SET_STRPAR(app);NGX_RTMP_SET_STRPAR(args);NGX_RTMP_SET_STRPAR(flashver);NGX_RTMP_SET_STRPAR(swf_url);NGX_RTMP_SET_STRPAR(tc_url);NGX_RTMP_SET_STRPAR(page_url);#undef NGX_RTMP_SET_STRPARp = ngx_strlchr(s->app.data, s->app.data + s->app.len, ‘?‘);if (p) {s->app.len = (p - s->app.data);}s->acodecs = (uint32_t) v->acodecs;s->vcodecs = (uint32_t) v->vcodecs;/* 找到客戶端 connect 的應(yīng)用配置 *//* find application & set app_conf */cacfp = cscf->applications.elts;for(n = 0; n < cscf->applications.nelts; ++n, ++cacfp) {if ((*cacfp)->name.len == s->app.len &&ngx_strncmp((*cacfp)->name.data, s->app.data, s->app.len) == 0){/* found app! */s->app_conf = (*cacfp)->app_conf;break;}}if (s->app_conf == NULL) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"connect: application not found: ‘%V‘", &s->app);return NGX_ERROR;}object_encoding = v->object_encoding;/* 發(fā)送應(yīng)答窗口大小:ack_size 給客戶端,該消息是用來(lái)通知對(duì)方應(yīng)答窗口的大小,* 發(fā)送方在發(fā)送了等于窗口大小的數(shù)據(jù)之后,等的愛(ài)接收對(duì)方的應(yīng)答消息(在接收 * 到應(yīng)答消息之前停止發(fā)送數(shù)據(jù))。接收當(dāng)必須發(fā)送應(yīng)答消息,在會(huì)話開(kāi)始時(shí),在 * 會(huì)話開(kāi)始時(shí),會(huì)從上一次發(fā)送應(yīng)答之后接收到了等于窗口大小的數(shù)據(jù) */return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK ||/* 發(fā)送 設(shè)置流帶寬消息。發(fā)送此消息來(lái)說(shuō)明對(duì)方的出口帶寬限制,接收方以此來(lái)限制 * 自己的出口帶寬,即限制未被應(yīng)答的消息數(shù)據(jù)大小。接收到此消息的一方,如果 * 窗口大小與上一次發(fā)送的不一致,應(yīng)該回復(fù)應(yīng)答窗口大小的消息 */ngx_rtmp_send_bandwidth(s, cscf->ack_window,NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK ||/* 發(fā)送 設(shè)置塊消息消息,用來(lái)通知對(duì)方新的最大的塊大小。 */ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK ||ngx_rtmp_send_amf(s, &h, out_elts,sizeof(out_elts) / sizeof(out_elts[0]))!= NGX_OK ? NGX_ERROR : NGX_OK; }

    這里,服務(wù)器向客戶端(192.168.1.82:xxxx)發(fā)送了 ack_size、bandwidth、chunk_size 和 對(duì) connect 的響應(yīng)的包。

    2.12 客戶端(192.168.1.82:xxx) 接收 服務(wù)器: ack_size(5)

    客戶端接收到服務(wù)器發(fā)來(lái)的設(shè)置應(yīng)答窗口大小的消息。

    2.12.1 ngx_rtmp_protocol_message_handler

    ngx_int_t ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,ngx_rtmp_header_t *h, ngx_chain_t *in) {...switch(h->type) {...case NGX_RTMP_MSG_ACK_SIZE:/* receive window size =val */ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"receive ack_size=%uD", val);/* 直接設(shè)置應(yīng)答窗口大小 */s->ack_size = val;break;...} }

    2.13 客戶端(192.168.1.82:xxx) 接收 服務(wù)器: bandwidth(6)

    客戶端接收到服務(wù)器發(fā)來(lái)的設(shè)置流帶寬的消息。

    2.13.1 ngx_rtmp_protocol_message_handler

    ngx_int_t ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,ngx_rtmp_header_t *h, ngx_chain_t *in) {...switch(h->type) {...case NGX_RTMP_MSG_BANDWIDTH:if (b->last - b->pos >= 5) {limit = *(uint8_t*)&b->pos[4];(void)val;(void)limit;ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"receive bandwidth=%uD limit=%d",val, (int)limit);/* receive window size =val* && limit */}break;...} }

    2.13 客戶端(192.168.1.82:xxx) 接收 服務(wù)器: chunk_size(1)

    客戶端接收到服務(wù)器發(fā)來(lái)的設(shè)置塊大小的消息。因此調(diào)用 ngx_rtmp_set_chunk_size 函數(shù)進(jìn)行設(shè)置。

    2.13 客戶端(192.168.1.82:xxx) 接收 服務(wù)器: amf_cmd(20) 之 _result()
    客戶端接收到服務(wù)器發(fā)送的對(duì) connect 的響應(yīng):_result(NetConnection.Connect.Success)。

    2.13.1 ngx_rtmp_relay_on_result

    static ngx_int_t ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {ngx_rtmp_relay_ctx_t *ctx;static struct {double trans;u_char level[32];u_char code[128];u_char desc[1024];} v;static ngx_rtmp_amf_elt_t in_inf[] = {{ NGX_RTMP_AMF_STRING,ngx_string("level"),&v.level, sizeof(v.level) },{ NGX_RTMP_AMF_STRING,ngx_string("code"),&v.code, sizeof(v.code) },{ NGX_RTMP_AMF_STRING,ngx_string("description"),&v.desc, sizeof(v.desc) },};static ngx_rtmp_amf_elt_t in_elts[] = {{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&v.trans, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,in_inf, sizeof(in_inf) },};ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (ctx == NULL || !s->relay) {return NGX_OK;}ngx_memzero(&v, sizeof(v));if (ngx_rtmp_receive_amf(s, in, in_elts,sizeof(in_elts) / sizeof(in_elts[0]))){return NGX_ERROR;}ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"relay: _result: level=‘%s‘ code=‘%s‘ description=‘%s‘",v.level, v.code, v.desc);switch ((ngx_int_t)v.trans) {case NGX_RTMP_RELAY_CONNECT_TRANS:/* 向服務(wù)器發(fā)送 createStream 命令 */return ngx_rtmp_relay_send_create_stream(s);case NGX_RTMP_RELAY_CREATE_STREAM_TRANS:if (ctx->publish != ctx && !s->static_relay) {if (ngx_rtmp_relay_send_publish(s) != NGX_OK) {return NGX_ERROR;}return ngx_rtmp_relay_play_local(s);} else {if (ngx_rtmp_relay_send_play(s) != NGX_OK) {return NGX_ERROR;}return ngx_rtmp_relay_publish_local(s);}default:return NGX_OK;} }

    該函數(shù)中首先解析接收到響應(yīng)數(shù)據(jù),然后根據(jù) v.trans 調(diào)用相應(yīng)的函數(shù)進(jìn)行處理,這里為調(diào)用 ngx_rtmp_relay_send_create_stream。

    2.13.2 ngx_rtmp_relay_send_create_stream

    static ngx_int_t ngx_rtmp_relay_send_create_stream(ngx_rtmp_session_t *s) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp relay: send create stream");static double trans = NGX_RTMP_RELAY_CREATE_STREAM_TRANS;static ngx_rtmp_amf_elt_t out_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,"createStream", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&trans, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 }};ngx_rtmp_header_t h;ngx_memzero(&h, sizeof(h));h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;h.type = NGX_RTMP_MSG_AMF_CMD;return ngx_rtmp_send_amf(s, &h, out_elts,sizeof(out_elts) / sizeof(out_elts[0])); }

    該函數(shù)主要是構(gòu)建 createStream 包,然后發(fā)送給服務(wù)器。

    2.14 服務(wù)器 接收 客戶端(192.168.1.82:xxx): amf_cmd(20) 之 createStream

    服務(wù)器接收到客戶端發(fā)來(lái)的 createStream 命令消息。

    2.14.1 ngx_rtmp_cmd_create_stream_init

    static ngx_int_t ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {static ngx_rtmp_create_stream_t v;static ngx_rtmp_amf_elt_t in_elts[] = {{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&v.trans, sizeof(v.trans) },};/* 解析該 createStream 命令消息,獲取 v.trans 值 */if (ngx_rtmp_receive_amf(s, in, in_elts,sizeof(in_elts) / sizeof(in_elts[0]))){return NGX_ERROR;}ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "createStream");return ngx_rtmp_create_stream(s, &v); }

    接著,從該函數(shù)中開(kāi)始調(diào)用 ngx_rtmp_create_stream 構(gòu)建的函數(shù)鏈表。這里調(diào)用到的是 ngx_rtmp_cmd_create_stream
    函數(shù)。

    2.14.2 ngx_rtmp_cmd_create_stream

    static ngx_int_t ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp cmd: create stream");/* support one message stream per connection */static double stream;static double trans;ngx_rtmp_header_t h;static ngx_rtmp_amf_elt_t out_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,"_result", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&trans, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&stream, sizeof(stream) },};trans = v->trans;stream = NGX_RTMP_MSID;ngx_memzero(&h, sizeof(h));h.csid = NGX_RTMP_CSID_AMF_INI;h.type = NGX_RTMP_MSG_AMF_CMD;return ngx_rtmp_send_amf(s, &h, out_elts,sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK ?NGX_DONE : NGX_ERROR; }

    該函數(shù)是構(gòu)建對(duì) createStream 的響應(yīng)。

    2.15 客戶端(192.168.1.82:xxx) 接收 服務(wù)器: amf_cmd(20) 之 _result()
    客戶端接收到服務(wù)器對(duì) createStream 的響應(yīng)包:_result()

    2.15.1 ngx_rtmp_relay_on_result

    static ngx_int_t ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {ngx_rtmp_relay_ctx_t *ctx;static struct {double trans;u_char level[32];u_char code[128];u_char desc[1024];} v;static ngx_rtmp_amf_elt_t in_inf[] = {{ NGX_RTMP_AMF_STRING,ngx_string("level"),&v.level, sizeof(v.level) },{ NGX_RTMP_AMF_STRING,ngx_string("code"),&v.code, sizeof(v.code) },{ NGX_RTMP_AMF_STRING,ngx_string("description"),&v.desc, sizeof(v.desc) },};static ngx_rtmp_amf_elt_t in_elts[] = {{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&v.trans, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,in_inf, sizeof(in_inf) },};ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (ctx == NULL || !s->relay) {return NGX_OK;}ngx_memzero(&v, sizeof(v));if (ngx_rtmp_receive_amf(s, in, in_elts,sizeof(in_elts) / sizeof(in_elts[0]))){return NGX_ERROR;}ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"relay: _result: level=‘%s‘ code=‘%s‘ description=‘%s‘",v.level, v.code, v.desc);switch ((ngx_int_t)v.trans) {case NGX_RTMP_RELAY_CONNECT_TRANS:return ngx_rtmp_relay_send_create_stream(s);case NGX_RTMP_RELAY_CREATE_STREAM_TRANS:if (ctx->publish != ctx && !s->static_relay) {/* 向服務(wù)器發(fā)送 publish 命令 */if (ngx_rtmp_relay_send_publish(s) != NGX_OK) {return NGX_ERROR;}return ngx_rtmp_relay_play_local(s);} else {if (ngx_rtmp_relay_send_play(s) != NGX_OK) {return NGX_ERROR;}return ngx_rtmp_relay_publish_local(s);}default:return NGX_OK;} }

    該函數(shù)中首先解析接收到響應(yīng)數(shù)據(jù),然后根據(jù) v.trans 調(diào)用相應(yīng)的函數(shù)進(jìn)行處理,這里為調(diào)用 ngx_rtmp_relay_send_publish。

    2.15.2 ngx_rtmp_relay_send_publish

    static ngx_int_t ngx_rtmp_relay_send_publish(ngx_rtmp_session_t *s) {ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp relay: send publish");static double trans;static ngx_rtmp_amf_elt_t out_elts[] = {{ NGX_RTMP_AMF_STRING,ngx_null_string,"publish", 0 },{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&trans, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_STRING,ngx_null_string,NULL, 0 }, /* <- to fill */{ NGX_RTMP_AMF_STRING,ngx_null_string,"live", 0 }};ngx_rtmp_header_t h;ngx_rtmp_relay_ctx_t *ctx;ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (ctx == NULL) {return NGX_ERROR;}if (ctx->play_path.len) {out_elts[3].data = ctx->play_path.data;out_elts[3].len = ctx->play_path.len;} else {out_elts[3].data = ctx->name.data;out_elts[3].len = ctx->name.len;}ngx_memzero(&h, sizeof(h));h.csid = NGX_RTMP_RELAY_CSID_AMF;h.msid = NGX_RTMP_RELAY_MSID;h.type = NGX_RTMP_MSG_AMF_CMD;return ngx_rtmp_send_amf(s, &h, out_elts,sizeof(out_elts) / sizeof(out_elts[0])); }

    2.15.3 ngx_rtmp_relay_play_local

    static ngx_int_t ngx_rtmp_relay_play_local(ngx_rtmp_session_t *s) {ngx_rtmp_play_t v;ngx_rtmp_relay_ctx_t *ctx;ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (ctx == NULL) {return NGX_ERROR;}ngx_memzero(&v, sizeof(ngx_rtmp_play_t));v.silent = 1;*(ngx_cpymem(v.name, ctx->name.data,ngx_min(sizeof(v.name) - 1, ctx->name.len))) = 0;return ngx_rtmp_play(s, &v); }

    在該函數(shù)中又調(diào)用 ngx_rtmp_play 構(gòu)建的函數(shù)鏈表,這里主要調(diào)用了 ngx_rtmp_live_play 函數(shù)。

    2.15.4 ngx_rtmp_live_play

    static ngx_int_t ngx_rtmp_live_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) {ngx_rtmp_live_app_conf_t *lacf;ngx_rtmp_live_ctx_t *ctx;lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);if (lacf == NULL || !lacf->live) {goto next;}ngx_log_debug4(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"live: play: name=‘%s‘ start=%uD duration=%uD reset=%d",v->name, (uint32_t) v->start,(uint32_t) v->duration, (uint32_t) v->reset);/* join stream as subscriber */ngx_rtmp_live_join(s, v->name, 0);ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);if (ctx == NULL) {goto next;}ctx->silent = v->silent;if (!ctx->silent && !lacf->play_restart) {ngx_rtmp_send_status(s, "NetStream.Play.Start","status", "Start live");ngx_rtmp_send_sample_access(s);}next:return next_play(s, v); }

    2.16 服務(wù)器 接收 客戶端(192.168.1.82:xxx): amf_cmd(20) 之 publish(‘test‘)

    服務(wù)器接收到客戶端的 publish 命令。

    2.16.1 ngx_rtmp_cmd_publish_init

    static ngx_int_t ngx_rtmp_cmd_publish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {static ngx_rtmp_publish_t v;static ngx_rtmp_amf_elt_t in_elts[] = {/* transaction is always 0 */{ NGX_RTMP_AMF_NUMBER,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_STRING,ngx_null_string,&v.name, sizeof(v.name) },{ NGX_RTMP_AMF_OPTIONAL | NGX_RTMP_AMF_STRING,ngx_null_string,&v.type, sizeof(v.type) },};ngx_memzero(&v, sizeof(v));if (ngx_rtmp_receive_amf(s, in, in_elts,sizeof(in_elts) / sizeof(in_elts[0]))){return NGX_ERROR;}ngx_rtmp_cmd_fill_args(v.name, v.args);ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,"publish: name=‘%s‘ args=‘%s‘ type=%s silent=%d",v.name, v.args, v.type, v.silent);return ngx_rtmp_publish(s, &v); }

    當(dāng)前客戶端連接的 application 為 live,而該 application{} 下沒(méi)有 push,因此這里主要調(diào)用 ngx_rtmp_live_publish。

    2.16.2 ngx_rtmp_live_publish

    static ngx_int_t ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) {ngx_rtmp_live_app_conf_t *lacf;ngx_rtmp_live_ctx_t *ctx;lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);if (lacf == NULL || !lacf->live) {goto next;}ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"live: publish: name=‘%s‘ type=‘%s‘",v->name, v->type);/* join stream as publisher */ngx_rtmp_live_join(s, v->name, 1);ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);if (ctx == NULL || !ctx->publishing) {goto next;}ctx->silent = v->silent;if (!ctx->silent) {/* 發(fā)送對(duì) publish 的響應(yīng) */ngx_rtmp_send_status(s, "NetStream.Publish.Start","status", "Start publishing");}next:return next_publish(s, v); }

    2.17 客戶端(192.168.1.82:xxx) 接收 服務(wù)器: amf_cmd(20) 之 onStatus

    客戶端接收到服務(wù)器發(fā)送的對(duì) publish 的響應(yīng)。表示客戶端可以向服務(wù)器發(fā)布流了。

    2.17.1 ngx_rtmp_relay_on_status

    static ngx_int_t ngx_rtmp_relay_on_status(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {ngx_rtmp_relay_ctx_t *ctx;static struct {double trans;u_char level[32];u_char code[128];u_char desc[1024];} v;static ngx_rtmp_amf_elt_t in_inf[] = {{ NGX_RTMP_AMF_STRING,ngx_string("level"),&v.level, sizeof(v.level) },{ NGX_RTMP_AMF_STRING,ngx_string("code"),&v.code, sizeof(v.code) },{ NGX_RTMP_AMF_STRING,ngx_string("description"),&v.desc, sizeof(v.desc) },};static ngx_rtmp_amf_elt_t in_elts[] = {{ NGX_RTMP_AMF_NUMBER,ngx_null_string,&v.trans, 0 },{ NGX_RTMP_AMF_NULL,ngx_null_string,NULL, 0 },{ NGX_RTMP_AMF_OBJECT,ngx_null_string,in_inf, sizeof(in_inf) },};static ngx_rtmp_amf_elt_t in_elts_meta[] = {{ NGX_RTMP_AMF_OBJECT,ngx_null_string,in_inf, sizeof(in_inf) },};ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);if (ctx == NULL || !s->relay) {return NGX_OK;}ngx_memzero(&v, sizeof(v));if (h->type == NGX_RTMP_MSG_AMF_META) {ngx_rtmp_receive_amf(s, in, in_elts_meta,sizeof(in_elts_meta) / sizeof(in_elts_meta[0]));} else {ngx_rtmp_receive_amf(s, in, in_elts,sizeof(in_elts) / sizeof(in_elts[0]));}ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"relay: onStatus: level=‘%s‘ code=‘%s‘ description=‘%s‘",v.level, v.code, v.desc);return NGX_OK; }

    2.18 服務(wù)器 接收 客戶端(obs): audio(8)

    服務(wù)器接收到客戶端 obs 發(fā)送的音頻包。

    receive: audio(8) 圖(14)

    對(duì)于 NGX_RTMP_MSG_AUDIO(8),主要有以下幾個(gè) rtmp 模塊設(shè)置了回調(diào)函數(shù):

    • ngx_rtmp_dash_module
    • ngx_rtmp_hls_module
    • ngx_rtmp_live_module
    • ngx_rtmp_record_module
    • ngx_rtmp_codec_module

    這里主要調(diào)用 codec 和 live 模塊設(shè)置的回調(diào)函數(shù),首先調(diào)用 ngx_rtmp_codec_module 模塊設(shè)置的回調(diào)函數(shù) ngx_rtmp_codec_av。

    2.18.1 ngx_rtmp_codec_av

    static ngx_int_t ngx_rtmp_codec_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {ngx_rtmp_core_srv_conf_t *cscf;ngx_rtmp_codec_ctx_t *ctx;ngx_chain_t **header;uint8_t fmt;static ngx_uint_t sample_rates[] ={ 5512, 11025, 22050, 44100 };if (h->type != NGX_RTMP_MSG_AUDIO && h->type != NGX_RTMP_MSG_VIDEO) {return NGX_OK;}ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);if (ctx == NULL) {ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_codec_ctx_t));ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_codec_module);}/* save codec */if (in->buf->last - in->buf->pos < 1) {return NGX_OK;}fmt = in->buf->pos[0];if (h->type == NGX_RTMP_MSG_AUDIO) {ctx->audio_codec_id = (fmt & 0xf0) >> 4;ctx->audio_channels = (fmt & 0x01) + 1;ctx->sample_size = (fmt & 0x02) ? 2 : 1;if (ctx->sample_rate == 0) {ctx->sample_rate = sample_rates[(fmt & 0x0c) >> 2];}} else {ctx->video_codec_id = (fmt & 0x0f);}/* save AVC/AAC header */if (in->buf->last - in->buf->pos < 3) {return NGX_OK;}/* no conf */if (!ngx_rtmp_is_codec_header(in)) {return NGX_OK;}cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);header = NULL;if (h->type == NGX_RTMP_MSG_AUDIO) {if (ctx->audio_codec_id == NGX_RTMP_AUDIO_AAC) {header = &ctx->aac_header;ngx_rtmp_codec_parse_aac_header(s, in);}} else {if (ctx->video_codec_id == NGX_RTMP_VIDEO_H264) {header = &ctx->avc_header;ngx_rtmp_codec_parse_avc_header(s, in);}}if (header == NULL) {return NGX_OK;}if (*header) {ngx_rtmp_free_shared_chain(cscf, *header);}*header = ngx_rtmp_append_shared_bufs(cscf, NULL, in);return NGX_OK; }

    2.18.2 ngx_rtmp_codec_parse_aac_header

    static void ngx_rtmp_codec_parse_aac_header(ngx_rtmp_session_t *s, ngx_chain_t *in) {ngx_uint_t idx;ngx_rtmp_codec_ctx_t *ctx;ngx_rtmp_bit_reader_t br;static ngx_uint_t aac_sample_rates[] ={ 96000, 88200, 64000, 48000,44100, 32000, 24000, 22050,16000, 12000, 11025, 8000,7350, 0, 0, 0 };#if (NGX_DEBUG)ngx_rtmp_codec_dump_header(s, "aac", in); #endifctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);ngx_rtmp_bit_init_reader(&br, in->buf->pos, in->buf->last);/* 讀取 16 bit 的值,這里讀取到的值不做處理,相當(dāng)于跳過(guò) 16 bit */ngx_rtmp_bit_read(&br, 16);/* 讀取 5 bit 的 aac_profile 值 */ctx->aac_profile = (ngx_uint_t) ngx_rtmp_bit_read(&br, 5);if (ctx->aac_profile == 31) {ctx->aac_profile = (ngx_uint_t) ngx_rtmp_bit_read(&br, 6) + 32;}idx = (ngx_uint_t) ngx_rtmp_bit_read(&br, 4);if (idx == 15) {ctx->sample_rate = (ngx_uint_t) ngx_rtmp_bit_read(&br, 24);} else {ctx->sample_rate = aac_sample_rates[idx];}ctx->aac_chan_conf = (ngx_uint_t) ngx_rtmp_bit_read(&br, 4);if (ctx->aac_profile == 5 || ctx->aac_profile == 29) {if (ctx->aac_profile == 29) {ctx->aac_ps = 1;}ctx->aac_sbr = 1;idx = (ngx_uint_t) ngx_rtmp_bit_read(&br, 4);if (idx == 15) {ctx->sample_rate = (ngx_uint_t) ngx_rtmp_bit_read(&br, 24);} else {ctx->sample_rate = aac_sample_rates[idx];}ctx->aac_profile = (ngx_uint_t) ngx_rtmp_bit_read(&br, 5);if (ctx->aac_profile == 31) {ctx->aac_profile = (ngx_uint_t) ngx_rtmp_bit_read(&br, 6) + 32;}}/* MPEG-4 Audio Specific Config5 bits: object typeif (object type == 31)6 bits + 32: object type4 bits: frequency indexif (frequency index == 15)24 bits: frequency4 bits: channel configurationif (object_type == 5)4 bits: frequency indexif (frequency index == 15)24 bits: frequency5 bits: object typeif (object type == 31)6 bits + 32: object typevar bits: AOT Specific Config*/ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"codec: aac header profile=%ui, ""sample_rate=%ui, chan_conf=%ui",ctx->aac_profile, ctx->sample_rate, ctx->aac_chan_conf); }

    2.18.3 ngx_rtmp_bit_init_reader

    void ngx_rtmp_bit_init_reader(ngx_rtmp_bit_reader_t *br, u_char *pos, u_char *last) {ngx_memzero(br, sizeof(ngx_rtmp_bit_reader_t));br->pos = pos;br->last = last; }

    該函數(shù)初始化一個(gè) bit reader。

    2.18.4 ngx_rtmp_bit_read

    uint64_t ngx_rtmp_bit_read(ngx_rtmp_bit_reader_t *br, ngx_uint_t n) {uint64_t v;ngx_uint_t d;v = 0;while (n) {/* 若已經(jīng)讀取到尾部,則置位錯(cuò)誤標(biāo)志位 */if (br->pos >= br->last) {br->err = 1;return 0;}/* 控制一次讀取的 bit 數(shù)不超過(guò) 8 bit */d = (br->offs + n > 8 ? (ngx_uint_t) (8 - br->offs) : n);v <<= d;/* 將讀取到的值追加到 v 中 */v += (*br->pos >> (8 - br->offs - d)) & ((u_char) 0xff >> (8 - d));/* 更新 bit reader 的 偏移值 offs */br->offs += d;n -= d;/* 若偏移值為8,則重置該偏移值 */if (br->offs == 8) {br->pos++;br->offs = 0;}}return v; }

    2.18.5 ngx_rtmp_live_av

    static ngx_int_t ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {ngx_rtmp_live_ctx_t *ctx, *pctx;ngx_rtmp_codec_ctx_t *codec_ctx;ngx_chain_t *header, *coheader, *meta,*apkt, *aapkt, *acopkt, *rpkt;ngx_rtmp_core_srv_conf_t *cscf;ngx_rtmp_live_app_conf_t *lacf;ngx_rtmp_session_t *ss;ngx_rtmp_header_t ch, lh, clh;ngx_int_t rc, mandatory, dummy_audio;ngx_uint_t prio;ngx_uint_t peers;ngx_uint_t meta_version;ngx_uint_t csidx;uint32_t delta;ngx_rtmp_live_chunk_stream_t *cs; #ifdef NGX_DEBUGconst char *type_s;type_s = (h->type == NGX_RTMP_MSG_VIDEO ? "video" : "audio"); #endiflacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);if (lacf == NULL) {return NGX_ERROR;}if (!lacf->live || in == NULL || in->buf == NULL) {return NGX_OK;}ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);if (ctx == NULL || ctx->stream == NULL) {return NGX_OK;}if (ctx->publishing == 0) {ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"live: %s from non-publisher", type_s);return NGX_OK;}/* 若當(dāng)前流處于未活躍狀態(tài) */if (!ctx->stream->active) {ngx_rtmp_live_start(s);}if (ctx->idle_evt.timer_set) {ngx_add_timer(&ctx->idle_evt, lacf->idle_timeout);}ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"live: %s packet timestamp=%uD",type_s, h->timestamp);s->current_time = h->timestamp;peers = 0;apkt = NULL;aapkt = NULL;acopkt = NULL;header = NULL;coheader = NULL;meta = NULL;meta_version = 0;mandatory = 0;prio = (h->type == NGX_RTMP_MSG_VIDEO ?ngx_rtmp_get_video_frame_type(in) : 0);cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);csidx = !(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO);cs = &ctx->cs[csidx];ngx_memzero(&ch, sizeof(ch));ch.timestamp = h->timestamp;ch.msid = NGX_RTMP_MSID;ch.csid = cs->csid;ch.type = h->type;lh = ch;if (cs->active) {lh.timestamp = cs->timestamp;}clh = lh;clh.type = (h->type == NGX_RTMP_MSG_AUDIO ? NGX_RTMP_MSG_VIDEO :NGX_RTMP_MSG_AUDIO);cs->active = 1;cs->timestamp = ch.timestamp;delta = ch.timestamp - lh.timestamp; /*if (delta >> 31) {ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"live: clipping non-monotonical timestamp %uD->%uD",lh.timestamp, ch.timestamp);delta = 0;ch.timestamp = lh.timestamp;} */rpkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in);ngx_rtmp_prepare_message(s, &ch, &lh, rpkt);codec_ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);if (codec_ctx) {if (h->type == NGX_RTMP_MSG_AUDIO) {header = codec_ctx->aac_header;if (lacf->interleave) {coheader = codec_ctx->avc_header;}if (codec_ctx->audio_codec_id == NGX_RTMP_AUDIO_AAC &&ngx_rtmp_is_codec_header(in)){prio = 0;mandatory = 1;}} else {header = codec_ctx->avc_header;if (lacf->interleave) {coheader = codec_ctx->aac_header;}if (codec_ctx->video_codec_id == NGX_RTMP_VIDEO_H264 &&ngx_rtmp_is_codec_header(in)){prio = 0;mandatory = 1;}}if (codec_ctx->meta) {meta = codec_ctx->meta;meta_version = codec_ctx->meta_version;}}/* broadcast to all subscribers */for (pctx = ctx->stream->ctx; pctx; pctx = pctx->next) {if (pctx == ctx || pctx->paused) {continue;}ss = pctx->session;cs = &pctx->cs[csidx];/* send metadata */if (meta && meta_version != pctx->meta_version) {ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: meta");if (ngx_rtmp_send_message(ss, meta, 0) == NGX_OK) {pctx->meta_version = meta_version;}}/* sync stream */if (cs->active && (lacf->sync && cs->dropped > lacf->sync)) {ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: sync %s dropped=%uD", type_s, cs->dropped);cs->active = 0;cs->dropped = 0;}/* absolute packet */if (!cs->active) {if (mandatory) {ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: skipping header");continue;}if (lacf->wait_video && h->type == NGX_RTMP_MSG_AUDIO &&!pctx->cs[0].active){ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: waiting for video");continue;}if (lacf->wait_key && prio != NGX_RTMP_VIDEO_KEY_FRAME &&(lacf->interleave || h->type == NGX_RTMP_MSG_VIDEO)){ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: skip non-key");continue;}dummy_audio = 0;if (lacf->wait_video && h->type == NGX_RTMP_MSG_VIDEO &&!pctx->cs[1].active){dummy_audio = 1;if (aapkt == NULL) {aapkt = ngx_rtmp_alloc_shared_buf(cscf);ngx_rtmp_prepare_message(s, &clh, NULL, aapkt);}}if (header || coheader) {/* send absolute codec header */ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: abs %s header timestamp=%uD",type_s, lh.timestamp);if (header) {if (apkt == NULL) {apkt = ngx_rtmp_append_shared_bufs(cscf, NULL, header);ngx_rtmp_prepare_message(s, &lh, NULL, apkt);}rc = ngx_rtmp_send_message(ss, apkt, 0);if (rc != NGX_OK) {continue;}}if (coheader) {if (acopkt == NULL) {acopkt = ngx_rtmp_append_shared_bufs(cscf, NULL, coheader);ngx_rtmp_prepare_message(s, &clh, NULL, acopkt);}rc = ngx_rtmp_send_message(ss, acopkt, 0);if (rc != NGX_OK) {continue;}} else if (dummy_audio) {ngx_rtmp_send_message(ss, aapkt, 0);}cs->timestamp = lh.timestamp;cs->active = 1;ss->current_time = cs->timestamp;} else {/* send absolute packet */ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: abs %s packet timestamp=%uD",type_s, ch.timestamp);if (apkt == NULL) {apkt = ngx_rtmp_append_shared_bufs(cscf, NULL, in);ngx_rtmp_prepare_message(s, &ch, NULL, apkt);}rc = ngx_rtmp_send_message(ss, apkt, prio);if (rc != NGX_OK) {continue;}cs->timestamp = ch.timestamp;cs->active = 1;ss->current_time = cs->timestamp;++peers;if (dummy_audio) {ngx_rtmp_send_message(ss, aapkt, 0);}continue;}}/* send relative packet */ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: rel %s packet delta=%uD",type_s, delta);if (ngx_rtmp_send_message(ss, rpkt, prio) != NGX_OK) {++pctx->ndropped;cs->dropped += delta;if (mandatory) {ngx_log_debug0(NGX_LOG_DEBUG_RTMP, ss->connection->log, 0,"live: mandatory packet failed");ngx_rtmp_finalize_session(ss);}continue;}cs->timestamp += delta;++peers;ss->current_time = cs->timestamp;}if (rpkt) {ngx_rtmp_free_shared_chain(cscf, rpkt);}if (apkt) {ngx_rtmp_free_shared_chain(cscf, apkt);}if (aapkt) {ngx_rtmp_free_shared_chain(cscf, aapkt);}if (acopkt) {ngx_rtmp_free_shared_chain(cscf, acopkt);}ngx_rtmp_update_bandwidth(&ctx->stream->bw_in, h->mlen);ngx_rtmp_update_bandwidth(&ctx->stream->bw_out, h->mlen * peers);ngx_rtmp_update_bandwidth(h->type == NGX_RTMP_MSG_AUDIO ?&ctx->stream->bw_in_audio :&ctx->stream->bw_in_video,h->mlen);return NGX_OK; }

    該函數(shù)的主要是將接收到來(lái)自客戶端 obs 發(fā)送來(lái)的 音視頻 數(shù)據(jù)轉(zhuǎn)發(fā)給該流的訂購(gòu)者,即 application live。

    2.18.6 ngx_rtmp_live_start

    static void ngx_rtmp_live_start(ngx_rtmp_session_t *s) {ngx_rtmp_core_srv_conf_t *cscf;ngx_rtmp_live_app_conf_t *lacf;ngx_chain_t *control;ngx_chain_t *status[3];size_t n, nstatus;cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);/* 構(gòu)建好 stream_begin rtmp 包 */control = ngx_rtmp_create_stream_begin(s, NGX_RTMP_MSID);nstatus = 0;if (lacf->play_restart) {status[nstatus++] = ngx_rtmp_create_status(s, "NetStream.Play.Start","status", "Start live");status[nstatus++] = ngx_rtmp_create_sample_access(s);}if (lacf->publish_notify) {status[nstatus++] = ngx_rtmp_create_status(s,"NetStream.Play.PublishNotify","status", "Start publishing");}ngx_rtmp_live_set_status(s, control, status, nstatus, 1);if (control) {ngx_rtmp_free_shared_chain(cscf, control);}for (n = 0; n < nstatus; ++n) {ngx_rtmp_free_shared_chain(cscf, status[n]);} }

    2.19 服務(wù)器 接收 客戶端(obs): video(9)

    receive: video(9) 圖(15)

    2.19.1 ngx_rtmp_codec_av

    static ngx_int_t ngx_rtmp_codec_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,ngx_chain_t *in) {ngx_rtmp_core_srv_conf_t *cscf;ngx_rtmp_codec_ctx_t *ctx;ngx_chain_t **header;uint8_t fmt;static ngx_uint_t sample_rates[] ={ 5512, 11025, 22050, 44100 };if (h->type != NGX_RTMP_MSG_AUDIO && h->type != NGX_RTMP_MSG_VIDEO) {return NGX_OK;}ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);if (ctx == NULL) {ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_codec_ctx_t));ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_codec_module);}/* save codec */if (in->buf->last - in->buf->pos < 1) {return NGX_OK;}fmt = in->buf->pos[0];if (h->type == NGX_RTMP_MSG_AUDIO) {ctx->audio_codec_id = (fmt & 0xf0) >> 4;ctx->audio_channels = (fmt & 0x01) + 1;ctx->sample_size = (fmt & 0x02) ? 2 : 1;if (ctx->sample_rate == 0) {ctx->sample_rate = sample_rates[(fmt & 0x0c) >> 2];}} else {ctx->video_codec_id = (fmt & 0x0f);}/* save AVC/AAC header */if (in->buf->last - in->buf->pos < 3) {return NGX_OK;}/* no conf */if (!ngx_rtmp_is_codec_header(in)) {return NGX_OK;}cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);header = NULL;if (h->type == NGX_RTMP_MSG_AUDIO) {if (ctx->audio_codec_id == NGX_RTMP_AUDIO_AAC) {header = &ctx->aac_header;ngx_rtmp_codec_parse_aac_header(s, in);}} else {if (ctx->video_codec_id == NGX_RTMP_VIDEO_H264) {header = &ctx->avc_header;ngx_rtmp_codec_parse_avc_header(s, in);}}if (header == NULL) {return NGX_OK;}if (*header) {ngx_rtmp_free_shared_chain(cscf, *header);}*header = ngx_rtmp_append_shared_bufs(cscf, NULL, in);return NGX_OK; }

    2.19.2 ngx_rtmp_codec_parse_avc_header

    static void ngx_rtmp_codec_parse_avc_header(ngx_rtmp_session_t *s, ngx_chain_t *in) {ngx_uint_t profile_idc, width, height, crop_left, crop_right,crop_top, crop_bottom, frame_mbs_only, n, cf_idc,num_ref_frames;ngx_rtmp_codec_ctx_t *ctx;ngx_rtmp_bit_reader_t br;#if (NGX_DEBUG)ngx_rtmp_codec_dump_header(s, "avc", in); #endifctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);ngx_rtmp_bit_init_reader(&br, in->buf->pos, in->buf->last);ngx_rtmp_bit_read(&br, 48);ctx->avc_profile = (ngx_uint_t) ngx_rtmp_bit_read_8(&br);ctx->avc_compat = (ngx_uint_t) ngx_rtmp_bit_read_8(&br);ctx->avc_level = (ngx_uint_t) ngx_rtmp_bit_read_8(&br);/* nal bytes */ctx->avc_nal_bytes = (ngx_uint_t) ((ngx_rtmp_bit_read_8(&br) & 0x03) + 1);/* nnals */if ((ngx_rtmp_bit_read_8(&br) & 0x1f) == 0) {return;}/* nal size */ngx_rtmp_bit_read(&br, 16);/* nal type */if (ngx_rtmp_bit_read_8(&br) != 0x67) {return;}/* SPS *//* profile idc */profile_idc = (ngx_uint_t) ngx_rtmp_bit_read(&br, 8);/* flags */ngx_rtmp_bit_read(&br, 8);/* level idc */ngx_rtmp_bit_read(&br, 8);/* SPS id */ngx_rtmp_bit_read_golomb(&br);if (profile_idc == 100 || profile_idc == 110 ||profile_idc == 122 || profile_idc == 244 || profile_idc == 44 ||profile_idc == 83 || profile_idc == 86 || profile_idc == 118){/* chroma format idc */cf_idc = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);if (cf_idc == 3) {/* separate color plane */ngx_rtmp_bit_read(&br, 1);}/* bit depth luma - 8 */ngx_rtmp_bit_read_golomb(&br);/* bit depth chroma - 8 */ngx_rtmp_bit_read_golomb(&br);/* qpprime y zero transform bypass */ngx_rtmp_bit_read(&br, 1);/* seq scaling matrix present */if (ngx_rtmp_bit_read(&br, 1)) {for (n = 0; n < (cf_idc != 3 ? 8u : 12u); n++) {/* seq scaling list present */if (ngx_rtmp_bit_read(&br, 1)) {/* TODO: scaling_list()if (n < 6) {} else {}*/}}}}/* log2 max frame num */ngx_rtmp_bit_read_golomb(&br);/* pic order cnt type */switch (ngx_rtmp_bit_read_golomb(&br)) {case 0:/* max pic order cnt */ngx_rtmp_bit_read_golomb(&br);break;case 1:/* delta pic order alwys zero */ngx_rtmp_bit_read(&br, 1);/* offset for non-ref pic */ngx_rtmp_bit_read_golomb(&br);/* offset for top to bottom field */ngx_rtmp_bit_read_golomb(&br);/* num ref frames in pic order */num_ref_frames = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);for (n = 0; n < num_ref_frames; n++) {/* offset for ref frame */ngx_rtmp_bit_read_golomb(&br);}}/* num ref frames */ctx->avc_ref_frames = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);/* gaps in frame num allowed */ngx_rtmp_bit_read(&br, 1);/* pic width in mbs - 1 */width = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);/* pic height in map units - 1 */height = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);/* frame mbs only flag */frame_mbs_only = (ngx_uint_t) ngx_rtmp_bit_read(&br, 1);if (!frame_mbs_only) {/* mbs adaprive frame field */ngx_rtmp_bit_read(&br, 1);}/* direct 8x8 inference flag */ngx_rtmp_bit_read(&br, 1);/* frame cropping */if (ngx_rtmp_bit_read(&br, 1)) {crop_left = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);crop_right = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);crop_top = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);crop_bottom = (ngx_uint_t) ngx_rtmp_bit_read_golomb(&br);} else {crop_left = 0;crop_right = 0;crop_top = 0;crop_bottom = 0;}ctx->width = (width + 1) * 16 - (crop_left + crop_right) * 2;ctx->height = (2 - frame_mbs_only) * (height + 1) * 16 -(crop_top + crop_bottom) * 2;ngx_log_debug7(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,"codec: avc header ""profile=%ui, compat=%ui, level=%ui, ""nal_bytes=%ui, ref_frames=%ui, width=%ui, height=%ui",ctx->avc_profile, ctx->avc_compat, ctx->avc_level,ctx->avc_nal_bytes, ctx->avc_ref_frames,ctx->width, ctx->height); }

    Nginx-rtmp直播之業(yè)務(wù)流程分析

    標(biāo)簽:應(yīng)用???free???5.4???9.png???exist???綜述???int???core???ali???

    原文地址:https://www.cnblogs.com/jimodetiantang/p/8994061.html

    總結(jié)

    以上是生活随笔為你收集整理的Nginx-rtmp直播之业务流程分析--比较详细的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。