日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

librtmp源码详解

發(fā)布時間:2023/12/31 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 librtmp源码详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

  1. librtmp概述

  實時流協議(Real-TimeMessaging Protocol,RTMP)是用于互聯網上傳輸視音頻數據的網絡協議。本API提供了支持RTMP, RTMPT,RTMPE,RTMPS及以上幾種協議的變種(RTMPTE, RTMPTS)協議所需的大部分客戶端功能以及少許的服務器功能。盡管Adobe公司已經公布了RTMP協議規(guī)范(RTMP specification),可是本工程并非經過Adobe的協議規(guī)范而是經過逆向工程的方式完成的。所以,它的運行方式可能和公布的協議規(guī)范有所偏離,可是通常狀況下它和Adobe的客戶端的運行方式是如出一轍的。c++

  本博客將對libRTMP的函數進行簡要說明。 這些函數能夠在 -lrtmp 庫中找到。其余還有不少函數,可是尚未為這些函數寫文檔。git

基本的操做以下文所述:github

RTMP_Alloc() :用于建立一個RTMP的結構體。web

RTMP_Init():初始化RTMP結構體。數組

RTMP_SetupURL():設置推拉流的URL。服務器

RTMP_EnableWrite(): 是否推流。網絡

RTMP_Connect():創(chuàng)建RTMP連接中的網絡鏈接(NetConnection)。app

RTMP_ConnectStream():創(chuàng)建RTMP連接中的網絡流(NetStream)。socket

RTMP_Read():讀取RTMP流的內容。

RTMP_Pause():流播放的時候能夠用于暫停和繼續(xù)。

RTMP_Seek():改變流播放的位置。

當RTMP_Read()返回0 字節(jié)的時候,表明流已經讀取完畢,然后能夠調用RTMP_Close()。

RTMP_Free():用于釋放RTMP結構體。

全部的數據都使用 FLV 格式進行傳輸。一個基本的會話須要一個RTMP URL。RTMP URL 格式以下所示:

rtmp[t][e|s]://hostname[:port][/app[/playpath]]

  2. librtmp流程圖

  本流程主要為推流示意圖,拉流與此大同小異。

  

?

  3. 源碼剖析

  源碼剖析的建議:

    a. 先了解RTMP協議內容或者結合代碼一塊理解,詳情可查閱:?http://www.cnblogs.com/Kingfans/p/7083100.html;

    b. 先了解AMF編碼機制,詳情課查閱:http://www.cnblogs.com/Kingfans/p/7069542.html;

    c. 發(fā)包過程當中請配合wireshark抓包工具一塊理解。

  3.1 RTMP_Alloc

RTMP_Alloc()用于建立一個RTMP的結構體。

/************************************************************************************************************ * @brief 申請RTMP內存 * ************************************************************************************************************/ RTMP* RTMP_Alloc() {return calloc(1, sizeof(RTMP)); }

  3.2 RTMP_Init

RTMP_Init():初始化RTMP結構體。

/************************************************************************************************************ * @brief 初始化RTMP * ************************************************************************************************************/ void RTMP_Init(RTMP *r) { #ifdef CRYPTOif (!RTMP_TLS_ctx)RTMP_TLS_Init(); #endifmemset(r, 0, sizeof(RTMP));r->m_sb.sb_socket = -1;r->m_inChunkSize = RTMP_DEFAULT_CHUNKSIZE;r->m_outChunkSize = RTMP_DEFAULT_CHUNKSIZE;r->m_bSendChunkSizeInfo = 1;r->m_nBufferMS = 30000;r->m_nClientBW = 2500000;r->m_nClientBW2 = 2;r->m_nServerBW = 2500000;r->m_fAudioCodecs = 3191.0;r->m_fVideoCodecs = 252.0;r->Link.timeout = 30;r->Link.swfAge = 30; }

  3.3 RTMP_SetupURL

RTMP_SetupURL():設置推拉流的URL。內部調用了RTMP_ParseURL主要用于解析url,詳見3.3.1。

/************************************************************************************************************ * @brief 設置推流: 地址url; * ************************************************************************************************************/ int RTMP_SetupURL(RTMP *r, char *url) {AVal opt, arg;char *p1, *p2, *ptr = strchr(url, ' ');int ret, len;unsigned int port = 0;if (ptr){*ptr = '\0';}len = (int)strlen(url);// 將url解析后設置到r;ret = RTMP_ParseURL(url, &r->Link.protocol, &r->Link.hostname, &port, &r->Link.playpath0, &r->Link.app);if (!ret){return ret;}r->Link.port = port;r->Link.playpath = r->Link.playpath0;while (ptr){*ptr++ = '\0';p1 = ptr;p2 = strchr(p1, '=');if (!p2){break;}opt.av_val = p1;opt.av_len = p2 - p1;*p2++ = '\0';arg.av_val = p2;ptr = strchr(p2, ' ');if (ptr){*ptr = '\0';arg.av_len = ptr - p2;/* skip repeated spaces */while (ptr[1] == ' '){*ptr++ = '\0';}}else{arg.av_len = (int)strlen(p2);}/* unescape */port = arg.av_len;for (p1=p2; port >0;){if (*p1 == '\\'){unsigned int c;if (port < 3){return FALSE;}sscanf(p1+1, "%02x", &c);*p2++ = c;port -= 3;p1 += 3;}else{*p2++ = *p1++;port--;}}arg.av_len = p2 - arg.av_val;ret = RTMP_SetOpt(r, &opt, &arg);if (!ret){return ret;}}if (!r->Link.tcUrl.av_len){r->Link.tcUrl.av_val = url;if (r->Link.app.av_len){if (r->Link.app.av_val < url + len){/* if app is part of original url, just use it */r->Link.tcUrl.av_len = r->Link.app.av_len + (r->Link.app.av_val - url);}else{len = r->Link.hostname.av_len + r->Link.app.av_len + sizeof("rtmpte://:65535/");r->Link.tcUrl.av_val = malloc(len);r->Link.tcUrl.av_len = snprintf(r->Link.tcUrl.av_val, len,"%s://%.*s:%d/%.*s",RTMPProtocolStringsLower[r->Link.protocol],r->Link.hostname.av_len, r->Link.hostname.av_val,r->Link.port,r->Link.app.av_len, r->Link.app.av_val);r->Link.lFlags |= RTMP_LF_FTCU;}}else{r->Link.tcUrl.av_len = (int)strlen(url);}}#ifdef CRYPTOif ((r->Link.lFlags & RTMP_LF_SWFV) && r->Link.swfUrl.av_len)RTMP_HashSWF(r->Link.swfUrl.av_val, &r->Link.SWFSize,(unsigned char *)r->Link.SWFHash, r->Link.swfAge); #endifSocksSetup(r, &r->Link.sockshost);if (r->Link.port == 0){if (r->Link.protocol & RTMP_FEATURE_SSL){r->Link.port = 443;}else if (r->Link.protocol & RTMP_FEATURE_HTTP){r->Link.port = 80;}else{r->Link.port = 1935;}}return TRUE; }

  3.3.1 RTMP_ParseURL

RTMP_ParseURL用于解析url,就是將url字符串內的協議類型、主機名稱、端口號、appname、playpath解析出來。

int RTMP_ParseURL(const char *url, int *protocol, AVal* host, unsigned int *port,AVal* playpath, AVal* app) {char *p, *end, *col, *ques, *slash;RTMP_Log(RTMP_LOGDEBUG, "RTMP_ParseURL");*protocol = RTMP_PROTOCOL_RTMP;*port = 0;playpath->av_len = 0;playpath->av_val = NULL;app->av_len = 0;app->av_val = NULL;/* Old School Parsing *//* look for usual :// pattern */p = strstr(url, "://");if(!p){RTMP_Log(RTMP_LOGERROR, "RTMP URL: No :// in url!");return FALSE;}{int len = (int)(p-url);if (len == 4 && strncasecmp(url, "rtmp", 4) == 0){*protocol = RTMP_PROTOCOL_RTMP;}else if (len == 5 && strncasecmp(url, "rtmpt", 5) == 0){*protocol = RTMP_PROTOCOL_RTMPT;}else if (len == 5 && strncasecmp(url, "rtmps", 5) == 0){*protocol = RTMP_PROTOCOL_RTMPS;}else if (len == 5 && strncasecmp(url, "rtmpe", 5) == 0){*protocol = RTMP_PROTOCOL_RTMPE;}else if (len == 5 && strncasecmp(url, "rtmfp", 5) == 0){*protocol = RTMP_PROTOCOL_RTMFP;}else if (len == 6 && strncasecmp(url, "rtmpte", 6) == 0){*protocol = RTMP_PROTOCOL_RTMPTE;}else if (len == 6 && strncasecmp(url, "rtmpts", 6) == 0){*protocol = RTMP_PROTOCOL_RTMPTS;}else{RTMP_Log(RTMP_LOGWARNING, "Unknown protocol!\n");goto parsehost;}}RTMP_Log(RTMP_LOGDEBUG, "Parsed protocol: %d", *protocol);parsehost:/* let's get the hostname */p+=3;/* check for sudden death */if(*p==0){RTMP_Log(RTMP_LOGWARNING, "No hostname in URL!");return FALSE;}end = p + strlen(p);col = strchr(p, ':');ques = strchr(p, '?');slash = strchr(p, '/');{int hostlen;if (slash){hostlen = slash - p;}else{hostlen = end - p;}if (col && col - p < hostlen){hostlen = col - p;}if(hostlen < 256){host->av_val = p;host->av_len = hostlen;RTMP_Log(RTMP_LOGDEBUG, "Parsed host : %.*s", hostlen, host->av_val);}else{RTMP_Log(RTMP_LOGWARNING, "Hostname exceeds 255 characters!");}p+=hostlen;}/* get the port number if available */if(*p == ':'){unsigned int p2;p++;p2 = atoi(p);if(p2 > 65535){RTMP_Log(RTMP_LOGWARNING, "Invalid port number!");}else{*port = p2;}}if(!slash){RTMP_Log(RTMP_LOGWARNING, "No application or playpath in URL!");return TRUE;}p = slash+1;{/* parse application** rtmp://host[:port]/app[/appinstance][/...]* application = app[/appinstance]*/char *slash2, *slash3 = NULL, *slash4 = NULL;int applen, appnamelen;slash2 = strchr(p, '/');if (slash2){slash3 = strchr(slash2+1, '/');}if (slash3){slash4 = strchr(slash3+1, '/');}applen = end-p; /* ondemand, pass all parameters as app */appnamelen = applen; /* ondemand length */if(ques && strstr(p, "slist=")) /* whatever it is, the '?' and slist= means we need to use everything as app and parse plapath from slist= */{appnamelen = ques-p;}else if(strncmp(p, "ondemand/", 9)==0){/* app = ondemand/foobar, only pass app=ondemand */applen = 8;appnamelen = 8;}else /* app!=ondemand, so app is app[/appinstance] */{if (slash4){appnamelen = slash4-p;}else if (slash3){appnamelen = slash3-p;}else if (slash2){appnamelen = slash2-p;}applen = appnamelen;}app->av_val = p;app->av_len = applen;RTMP_Log(RTMP_LOGDEBUG, "Parsed app : %.*s", applen, p);p += appnamelen;}if (*p == '/'){p++;}if (end-p){AVal av = {p, end-p};RTMP_ParsePlaypath(&av, playpath);}return TRUE; }

  3.4 RTMP_EnableWrite

RTMP_EnableWrite()設置為推流狀態(tài)。

/************************************************************************************************************ * @brief 是否進行推流; * ************************************************************************************************************/ void RTMP_EnableWrite(RTMP *r) {r->Link.protocol |= RTMP_FEATURE_WRITE; }

  3.5 RTMP_Connect

RTMP_Connect():創(chuàng)建RTMP連接中的網絡鏈接(NetConnection)。主要分為RTMP_Connect0(3.5.1)+ RTMP_Connect1(3.5.2)。

/************************************************************************************************************ * @brief 創(chuàng)建RTMP中的NetConnection * * @return 成功返回TRUE, 不然返回FALSE. ************************************************************************************************************/ int RTMP_Connect(RTMP *r, RTMPPacket *cp) {// Socket結構體;struct sockaddr_storage service;if (!r->Link.hostname.av_len){return FALSE;}// COMODO security software sandbox blocks all DNS by returning "host not found"HOSTENT *h = gethostbyname("localhost");if (!h && GetLastError() == WSAHOST_NOT_FOUND){RTMP_Log(RTMP_LOGERROR, "RTMP_Connect: Connection test failed. This error is likely caused by Comodo Internet Security running OBS in sandbox mode. Please add OBS to the Comodo automatic sandbox exclusion list, restart OBS and try again (11001).");return FALSE;}memset(&service, 0, sizeof(service));if (r->Link.socksport){// 加入地址信息, 使用SOCKS鏈接;if (!add_addr_info(&service, &r->Link.sockshost, r->Link.socksport)){return FALSE;}}else{// 直接鏈接;if (!add_addr_info(&service, &r->Link.hostname, r->Link.port)){return FALSE;}}RTMP_Log(RTMP_LOGDEBUG, "%s, 創(chuàng)建鏈接:第0次鏈接。開始創(chuàng)建Socket鏈接", __FUNCTION__);// RTMP_Connect0()主要用于創(chuàng)建Socket鏈接,并未開始真正的創(chuàng)建RTMP鏈接;if (!RTMP_Connect0(r, (struct sockaddr *)&service)){RTMP_Log(RTMP_LOGDEBUG, "%s, 創(chuàng)建鏈接:第0次鏈接。創(chuàng)建Socket鏈接失敗", __FUNCTION__);return FALSE;}RTMP_Log(RTMP_LOGDEBUG, "%s, 創(chuàng)建鏈接:第0次鏈接。創(chuàng)建Socket鏈接成功", __FUNCTION__);r->m_bSendCounter = TRUE;// 真正創(chuàng)建RTMP鏈接的函數;return RTMP_Connect1(r, cp); }

  3.5.1 RTMP_Connect0

RTMP_Connect0():第0次鏈接,創(chuàng)建socket鏈接。

/************************************************************************************************************ * @brief 第0次鏈接,創(chuàng)建Socket鏈接 * * @return 成功返回TRUE, 不然返回FALSE. ************************************************************************************************************/ int RTMP_Connect0(RTMP *r, struct sockaddr * service) {int on = 1;r->m_sb.sb_timedout = FALSE;r->m_pausing = 0;r->m_fDuration = 0.0;// 建立一個Socket,并把Socket序號賦值給相應變量;//r->m_sb.sb_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);r->m_sb.sb_socket = WSASocket(service->sa_family, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);if (r->m_sb.sb_socket != -1){if(r->m_bindIP.addrLen){if (bind(r->m_sb.sb_socket, (const struct sockaddr *)&r->m_bindIP.addr, r->m_bindIP.addrLen) < 0){int err = GetSockError();RTMP_Log(RTMP_LOGERROR, "%s, failed to bind socket: %s (%d)",__FUNCTION__, socketerror(err), err);RTMP_Close(r);return FALSE;}}// 定義函數int connect (int sockfd, struct sockaddr* serv_addr, int addrlen);    // 函數說明connect()用來將參數sockfd 的Socket連至參數serv_addr指定的網絡地址。參數addrlen為sockaddr的結構長度。 // 鏈接; if (connect(r->m_sb.sb_socket, service, sizeof(struct sockaddr_storage)) < 0){int err = GetSockError();switch (err){case 10061:{RTMP_Log(RTMP_LOGERROR, "%s is offline. Try a different server (10061).", r->Link.hostname.av_val);}break;case 10013:{RTMP_Log(RTMP_LOGERROR, "The connection is being blocked by a firewall or other security software (10013).");}break;case 10060:{RTMP_Log(RTMP_LOGERROR, "The connection timed out. Try a different server, or check that the connection is not being blocked by a firewall or other security software (10060).");}break;default:{RTMP_Log(RTMP_LOGERROR, "%s, failed to connect socket: %s (%d)", __FUNCTION__, socketerror(err), err);}break;}RTMP_Close(r);return FALSE;}// 指定了端口號 (注:這不是必需的);if (r->Link.socksport){RTMP_Log(RTMP_LOGDEBUG, "%s ... SOCKS negotiation", __FUNCTION__);// 談判? 發(fā)送數據報以進行談判? ;if (!SocksNegotiate(r)){RTMP_Log(RTMP_LOGERROR, "%s, SOCKS negotiation failed.", __FUNCTION__);RTMP_Close(r);return FALSE;}}}else{RTMP_Log(RTMP_LOGERROR, "%s, failed to create socket. Error: %d", __FUNCTION__,GetSockError());return FALSE;}/* set timeout */// 超時;{SET_RCVTIMEO(tv, r->Link.timeout);if (setsockopt(r->m_sb.sb_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv))){RTMP_Log(RTMP_LOGERROR, "%s, Setting socket timeout to %ds failed!", __FUNCTION__, r->Link.timeout);}}if (!r->m_bUseNagle){setsockopt(r->m_sb.sb_socket, IPPROTO_TCP, TCP_NODELAY, (char *)&on, sizeof(on));}return TRUE; }

  3.5.2 RTMP_Connect1

RTMP_Connect1():第1次鏈接,創(chuàng)建真正的rtmp鏈接。主要分為HandShake握手(3.5.2.1)、SendConnectPacket發(fā)送命令請求(3.5.2.2)。

/************************************************************************************************************ * @brief 第1次鏈接,創(chuàng)建RTMP鏈接 * ************************************************************************************************************/ int RTMP_Connect1(RTMP *r, RTMPPacket *cp) {if (r->Link.protocol & RTMP_FEATURE_SSL){ #if defined(CRYPTO) && !defined(NO_SSL)TLS_client(RTMP_TLS_ctx, r->m_sb.sb_ssl);TLS_setfd(r->m_sb.sb_ssl, r->m_sb.sb_socket);if (TLS_connect(r->m_sb.sb_ssl) < 0){RTMP_Log(RTMP_LOGERROR, "%s, TLS_Connect failed", __FUNCTION__);RTMP_Close(r);return FALSE;} #elseRTMP_Log(RTMP_LOGERROR, "%s, no SSL/TLS support", __FUNCTION__);RTMP_Close(r);return FALSE;#endif}// 使用HTTP;if (r->Link.protocol & RTMP_FEATURE_HTTP){r->m_msgCounter = 1;r->m_clientID.av_val = NULL;r->m_clientID.av_len = 0;HTTP_Post(r, RTMPT_OPEN, "", 1);if (HTTP_read(r, 1) != 0){r->m_msgCounter = 0;RTMP_Log(RTMP_LOGDEBUG, "%s, Could not connect for handshake", __FUNCTION__);RTMP_Close(r);return 0;}r->m_msgCounter = 0;}RTMP_Log(RTMP_LOGDEBUG, "%s, ... connected, handshaking", __FUNCTION__);// 握手(C0+C1, S0+S1+S2, C1);if (!HandShake(r, TRUE)){RTMP_Log(RTMP_LOGERROR, "%s, handshake failed.", __FUNCTION__);RTMP_Close(r);return FALSE;}RTMP_Log(RTMP_LOGDEBUG, "%s, handshaked", __FUNCTION__);if (!SendConnectPacket(r, cp)){RTMP_Log(RTMP_LOGERROR, "%s, RTMP connect failed.", __FUNCTION__);RTMP_Close(r);return FALSE;} return TRUE; }

  3.5.2.1 HandShake

HandShake():握手過程實現(C0+C1, S0+S1+S2, C2), 具體查看RTMP協議規(guī)范。

/************************************************************************************************************ * @brief 客戶端握手過程; * * C0和C1放在一個buffer中發(fā)送出去,當客戶端收齊S0和S1數據后,開始發(fā)送C2; * 當客戶端收到S2后并驗證,則客戶端的握手完成; * 此處將就收到的S1原封不動的當成C2發(fā)送給服務器端; * * C0/S0 (1個字節(jié)) : 表示版本號; * C1/S1 (1536字節(jié)) : 格式為 | time(4字節(jié)) | 0 0 0 0 (4字節(jié)) | 1528個隨機數 | * C2/S2 (1536字節(jié)) : 格式為 | time(4字節(jié))(對等端發(fā)送的時間) | time2 (4字節(jié)) | 1528個對等端隨機回復 | * * @return 握手成功返回TRUE, 不然返回FALSE. ************************************************************************************************************/ static int HandShake(RTMP *r, int FP9HandShake) {int i = 0;uint32_t uptime = 0, suptime = 0; // 客戶端和服務器端時間;int bMatch = 0;char type;char strC[RTMP_SIG_SIZE + 1]; // C0+C1;char* strC1 = strC + 1; // C1;char strC2[RTMP_SIG_SIZE]; // C2;char strS2[RTMP_SIG_SIZE]; // S2// RTMP協議版本號為0x03, 即C0數據;strC[0] = 0x03; // not encrypted: 沒有加密;// 獲取系統時間(毫秒為單位),將其寫入到C1中,占4個字節(jié);uptime = htonl(RTMP_GetTime());memcpy(strC1, &uptime, 4);// 上次對方返回請求的時間(毫秒為單位),將其寫入到C1中;memset(&strC1[4], 0, 4);#ifdef _DEBUG// debug版,后面的1528個隨機數簡單的都設為0xff;for (i = 8; i < RTMP_SIG_SIZE; i++){strC1[i] = 0xff;} #else// release版,使用rand()循環(huán)生成1528個偽隨機數;for (i = 8; i < RTMP_SIG_SIZE; i++){strC1[i] = (char)(rand() % 256);} #endif// 發(fā)送握手數據C0和C1 if (!WriteN(r, strC, RTMP_SIG_SIZE + 1)){return FALSE;}// 讀取數據報,長度為1,存入type中;// 此處讀取的是服務器端發(fā)送來的S0,表示服務器使用的Rtmp版本;if (ReadN(r, &type, 1) != 1){/* 0x03 or 0x06 (03是明文,06是加密,其余值非法)*/return FALSE;}RTMP_Log(RTMP_LOGDEBUG, "%s: Type Answer : %02X", __FUNCTION__, type);// 客戶端要求的版本與服務器端提供的版本不同;if (type != strC[0]){RTMP_Log(RTMP_LOGWARNING, "%s: Type mismatch: client sent %d, server answered %d", __FUNCTION__, strC[0], type);}// 讀取服務器端發(fā)送過來的S1數據賦值給C2,并判斷隨機序列長度是否相同;if (ReadN(r, strC2, RTMP_SIG_SIZE) != RTMP_SIG_SIZE){return FALSE;}/* decode server response */// 把serversig的前4個字節(jié)賦值給suptime;// S1中的time與C2中的time應該相同;memcpy(&suptime, strC2, 4);// 將網絡字節(jié)序轉化為主機字節(jié)序,即大端轉小端;suptime = ntohl(suptime);RTMP_Log(RTMP_LOGDEBUG, "%s: Server Uptime : %d", __FUNCTION__, suptime);RTMP_Log(RTMP_LOGDEBUG, "%s: FMS Version : %d.%d.%d.%d", __FUNCTION__, strC2[4], strC2[5], strC2[6], strC2[7]);/* 2nd part of handshake */// 發(fā)送握手數據C2(1536個字節(jié))給服務器;if (!WriteN(r, strC2, RTMP_SIG_SIZE)){return FALSE;}// 讀取從服務器發(fā)送過來的握手數據S2(1536個字節(jié));if (ReadN(r, strS2, RTMP_SIG_SIZE) != RTMP_SIG_SIZE){return FALSE;}// 比較客戶端C1和服務器端S2的1536個數是否匹配;bMatch = (memcmp(strS2, strC1, RTMP_SIG_SIZE) == 0);if (!bMatch){RTMP_Log(RTMP_LOGWARNING, "%s, client signature does not match!", __FUNCTION__);}return TRUE; }

wireshark抓包結果:

  3.5.2.2 SendConnectPacket

SendConnectPacket()發(fā)送命令請求:? set chunk size 和 connect(stream id)。

/************************************************************************************************************ * @brief 發(fā)送connect命令; * 這是每次程序運行的時候發(fā)送的第一個命令消息; * 命令消息由命令名,傳輸ID,和命令對象組成; * 命令對象由一系列的相關參數組成; * 可參考rtmp協議:rtmp命令消息--4.1.1節(jié); * * set chunk size + connect(stream id) ************************************************************************************************************/ static int SendConnectPacket(RTMP *r, RTMPPacket *cp) {RTMPPacket packet;char pbuf[4096], *pend = pbuf + sizeof(pbuf);char *enc;if (cp){return RTMP_SendPacket(r, cp, TRUE);}if((r->Link.protocol & RTMP_FEATURE_WRITE) && r->m_bSendChunkSizeInfo){packet.m_nChannel = 0x02;packet.m_headerType = RTMP_PACKET_SIZE_LARGE;packet.m_packetType = RTMP_PACKET_TYPE_CHUNK_SIZE;packet.m_nTimeStamp = 0;packet.m_nInfoField2 = 0;packet.m_hasAbsTimestamp = 0;packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;packet.m_nBodySize = 4;enc = packet.m_body;AMF_EncodeInt32(enc, pend, r->m_outChunkSize);// 發(fā)送 set chunk size;if (!RTMP_SendPacket(r, &packet, FALSE)){return 0;}}packet.m_nChannel = 0x03; /* control channel (invoke) */packet.m_headerType = RTMP_PACKET_SIZE_LARGE;packet.m_packetType = RTMP_PACKET_TYPE_INVOKE;packet.m_nTimeStamp = 0;packet.m_nInfoField2 = 0;packet.m_hasAbsTimestamp = 0;packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;enc = packet.m_body;enc = AMF_EncodeString(enc, pend, &av_connect);enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);*enc++ = AMF_OBJECT;enc = AMF_EncodeNamedString(enc, pend, &av_app, &r->Link.app);if (!enc){return FALSE;}if (r->Link.protocol & RTMP_FEATURE_WRITE){enc = AMF_EncodeNamedString(enc, pend, &av_type, &av_nonprivate);if (!enc){return FALSE;}}if (r->Link.flashVer.av_len){enc = AMF_EncodeNamedString(enc, pend, &av_flashVer, &r->Link.flashVer);if (!enc){return FALSE;}}if (r->Link.swfUrl.av_len){enc = AMF_EncodeNamedString(enc, pend, &av_swfUrl, &r->Link.swfUrl);if (!enc){return FALSE;}}if (r->Link.tcUrl.av_len){enc = AMF_EncodeNamedString(enc, pend, &av_tcUrl, &r->Link.tcUrl);if (!enc){return FALSE;}}if (!(r->Link.protocol & RTMP_FEATURE_WRITE)){enc = AMF_EncodeNamedBoolean(enc, pend, &av_fpad, FALSE);if (!enc){return FALSE;}enc = AMF_EncodeNamedNumber(enc, pend, &av_capabilities, 15.0);if (!enc){return FALSE;}enc = AMF_EncodeNamedNumber(enc, pend, &av_audioCodecs, r->m_fAudioCodecs);if (!enc){return FALSE;}enc = AMF_EncodeNamedNumber(enc, pend, &av_videoCodecs, r->m_fVideoCodecs);if (!enc){return FALSE;}enc = AMF_EncodeNamedNumber(enc, pend, &av_videoFunction, 1.0);if (!enc){return FALSE;}if (r->Link.pageUrl.av_len){enc = AMF_EncodeNamedString(enc, pend, &av_pageUrl, &r->Link.pageUrl);if (!enc){return FALSE;}}}if (r->m_fEncoding != 0.0 || r->m_bSendEncoding){/* AMF0, AMF3 not fully supported yet */enc = AMF_EncodeNamedNumber(enc, pend, &av_objectEncoding, r->m_fEncoding);if (!enc){return FALSE;}}if (enc + 3 >= pend){return FALSE;}*enc++ = 0;*enc++ = 0; /* end of object - 0x00 0x00 0x09 */*enc++ = AMF_OBJECT_END;/* add auth string */if (r->Link.auth.av_len){enc = AMF_EncodeBoolean(enc, pend, r->Link.lFlags & RTMP_LF_AUTH);if (!enc){return FALSE;}enc = AMF_EncodeString(enc, pend, &r->Link.auth);if (!enc){return FALSE;}}if (r->Link.extras.o_num){int i;for (i = 0; i < r->Link.extras.o_num; i++){enc = AMFProp_Encode(&r->Link.extras.o_props[i], enc, pend);if (!enc){return FALSE;}}}packet.m_nBodySize = enc - packet.m_body;// 發(fā)送 connect(streamid);return RTMP_SendPacket(r, &packet, TRUE); }

wireshark抓包結果:

  3.6 RTMP_ConnectStream

RTMP_ConnectStream(): 創(chuàng)建RTMP中的NetStream。此處while循環(huán)接收消息并調用RTMP_ClientPacket進行處理。

/************************************************************************************************************ * @brief 創(chuàng)建RTMP中的NetStream * * @return 成功返回TRUE, 不然返回FALSE. ************************************************************************************************************/ int RTMP_ConnectStream(RTMP *r, int seekTime) {RTMPPacket packet = { 0 };/* seekTime was already set by SetupStream / SetupURL.* This is only needed by ReconnectStream.*/if (seekTime > 0){r->Link.seekTime = seekTime;}r->m_mediaChannel = 0;while (!r->m_bPlaying && RTMP_IsConnected(r) && RTMP_ReadPacket(r, &packet)){if (RTMPPacket_IsReady(&packet)){if (!packet.m_nBodySize){continue;}if ((packet.m_packetType == RTMP_PACKET_TYPE_AUDIO) ||(packet.m_packetType == RTMP_PACKET_TYPE_VIDEO) ||(packet.m_packetType == RTMP_PACKET_TYPE_INFO)){RTMP_Log(RTMP_LOGWARNING, "Received FLV packet before play()! Ignoring.");RTMPPacket_Free(&packet);continue;}RTMP_ClientPacket(r, &packet);RTMPPacket_Free(&packet);}}return r->m_bPlaying; }

  3.6.1 RTMP_ClientPacket

RTMP_ClientPacket():? 處理收到的消息。

/************************************************************************************************************ * @brief 處理接收到的Chunk; * * @return 有數據返回TRUE, 無返回FALSE. ************************************************************************************************************/ int RTMP_ClientPacket(RTMP *r, RTMPPacket *packet) {int bHasMediaPacket = 0;switch (packet->m_packetType){// RTMP消息類型ID=1 設置塊大小;case RTMP_PACKET_TYPE_CHUNK_SIZE:{/* chunk size */RTMP_Log(RTMP_LOGDEBUG, "處理消息: chunk_size (m_packetType=1)\n");HandleChangeChunkSize(r, packet);}break;// RTMP消息類型ID=3 致謝;case RTMP_PACKET_TYPE_BYTES_READ_REPORT:{/* bytes read report */RTMP_Log(RTMP_LOGDEBUG, "處理消息: bytes_read_report (m_packetType=3)\n ");}break;// RTMP消息類型ID=4 用戶控制;case RTMP_PACKET_TYPE_CONTROL:{/* ctrl */RTMP_Log(RTMP_LOGDEBUG, "處理消息: control (m_packetType=4)\n ");HandleCtrl(r, packet);}break;// RTMP消息類型ID=5 ;case RTMP_PACKET_TYPE_SERVER_BW:{/* server bw */RTMP_Log(RTMP_LOGDEBUG, "處理消息: server_bw (m_packetType=5)\n ");HandleServerBW(r, packet);}break;// RTMP消息類型ID=6;case RTMP_PACKET_TYPE_CLIENT_BW:{/* client bw */RTMP_Log(RTMP_LOGDEBUG, "處理消息: client_bw (m_packetType=6)\n ");HandleClientBW(r, packet);}break;// RTMP消息類型ID=8 音頻數據;case RTMP_PACKET_TYPE_AUDIO:{/* audio data */RTMP_Log(RTMP_LOGDEBUG, "處理消息: audio (m_packetType=8)\n ");HandleAudio(r, packet);bHasMediaPacket = 1;if (!r->m_mediaChannel){r->m_mediaChannel = packet->m_nChannel;}if (!r->m_pausing){r->m_mediaStamp = packet->m_nTimeStamp;}}break;// RTMP消息類型ID=9 視頻數據;case RTMP_PACKET_TYPE_VIDEO:{/* video data */RTMP_Log(RTMP_LOGDEBUG, "處理消息: video (m_packetType=9)\n ");HandleVideo(r, packet);bHasMediaPacket = 1;if (!r->m_mediaChannel){r->m_mediaChannel = packet->m_nChannel;}if (!r->m_pausing){r->m_mediaStamp = packet->m_nTimeStamp;}}break;// RTMP消息類型ID=15 AMF3編碼 忽略; case RTMP_PACKET_TYPE_FLEX_STREAM_SEND:{/* flex stream send */RTMP_Log(RTMP_LOGDEBUG, "%s, flex stream send, size %u bytes, not supported, ignoring", __FUNCTION__, packet->m_nBodySize);}break;// RTMP消息類型ID=16 AMF3編碼 忽略;case RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT:{/* flex shared object */RTMP_Log(RTMP_LOGDEBUG, "%s, flex shared object, size %u bytes, not supported, ignoring", __FUNCTION__, packet->m_nBodySize);}break;// RTMP消息類型ID=17 AMF3編碼 忽略 case RTMP_PACKET_TYPE_FLEX_MESSAGE:{/* flex message */RTMP_Log(RTMP_LOGDEBUG,"%s, flex message, size %u bytes, not fully supported",__FUNCTION__, packet->m_nBodySize);/*RTMP_LogHex(packet.m_body, packet.m_nBodySize); *//* some DEBUG code */#if 0RTMP_LIB_AMFObject obj;int nRes = obj.Decode(packet.m_body+1, packet.m_nBodySize-1);if(nRes < 0){RTMP_Log(RTMP_LOGERROR, "%s, error decoding AMF3 packet", __FUNCTION__);/*return; */}obj.Dump();#endifif (HandleInvoke(r, packet->m_body + 1, packet->m_nBodySize - 1) == 1){bHasMediaPacket = 2;}}break;// RTMP消息類型ID=18 AMF0編碼 數據消息;case RTMP_PACKET_TYPE_INFO:{/* metadata (notify) */RTMP_Log(RTMP_LOGDEBUG, "%s, received: notify %u bytes", __FUNCTION__, packet->m_nBodySize);// 處理元數據;if (HandleMetadata(r, packet->m_body, packet->m_nBodySize)){bHasMediaPacket = 1;}}break;// RTMP消息類型ID=19 AMF0編碼,忽略;case RTMP_PACKET_TYPE_SHARED_OBJECT:{RTMP_Log(RTMP_LOGDEBUG, "%s, shared object, not supported, ignoring", __FUNCTION__);}break;// RTMP消息類型ID=20 AMF0編碼,命令消息 case RTMP_PACKET_TYPE_INVOKE:{/* invoke */RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__, packet->m_nBodySize);/*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */if (HandleInvoke(r, packet->m_body, packet->m_nBodySize) == 1){bHasMediaPacket = 2;}}break;// RTMP消息類型ID=22case RTMP_PACKET_TYPE_FLASH_VIDEO:{/* go through FLV packets and handle metadata packets */unsigned int pos = 0;uint32_t nTimeStamp = packet->m_nTimeStamp;while (pos + 11 < packet->m_nBodySize){uint32_t dataSize = AMF_DecodeInt24(packet->m_body + pos + 1); /* size without header (11) and prevTagSize (4) */if (pos + 11 + dataSize + 4 > packet->m_nBodySize){RTMP_Log(RTMP_LOGWARNING, "Stream corrupt?!");break;}if (packet->m_body[pos] == 0x12){HandleMetadata(r, packet->m_body + pos + 11, dataSize);}else if (packet->m_body[pos] == 8 || packet->m_body[pos] == 9){nTimeStamp = AMF_DecodeInt24(packet->m_body + pos + 4);nTimeStamp |= (packet->m_body[pos + 7] << 24);}pos += (11 + dataSize + 4);}if (!r->m_pausing)r->m_mediaStamp = nTimeStamp;/* FLV tag(s) *//*RTMP_Log(RTMP_LOGDEBUG, "%s, received: FLV tag(s) %lu bytes", __FUNCTION__, packet.m_nBodySize); */bHasMediaPacket = 1;}break;default:{RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__,packet->m_packetType); #ifdef _DEBUGRTMP_LogHex(RTMP_LOGDEBUG, packet->m_body, packet->m_nBodySize); #endif}break;}return bHasMediaPacket; }

wireshark抓包結果:

  3.7 RTMP_SendPacket

RTMP_SendPacket():? 消息流發(fā)送,內含分塊處理,具體可結合RTMP協議規(guī)范。

/************************************************************************************************************ * @brief 消息流發(fā)送: 包括分塊發(fā)送實現; * ************************************************************************************************************/ int RTMP_SendPacket(RTMP *r, RTMPPacket *packet, int queue) {const RTMPPacket *prevPacket;uint32_t last = 0;int nSize;int hSize, cSize;char *header, *hptr, *hend, hbuf[RTMP_MAX_HEADER_SIZE], c;uint32_t t;char *buffer, *tbuf = NULL, *toff = NULL;int nChunkSize;int tlen;if (packet->m_nChannel >= r->m_channelsAllocatedOut){int n = packet->m_nChannel + 10;RTMPPacket **packets = realloc(r->m_vecChannelsOut, sizeof(RTMPPacket*) * n);if (!packets){free(r->m_vecChannelsOut);r->m_vecChannelsOut = NULL;r->m_channelsAllocatedOut = 0;return FALSE;}r->m_vecChannelsOut = packets;memset(r->m_vecChannelsOut + r->m_channelsAllocatedOut, 0, sizeof(RTMPPacket*) * (n - r->m_channelsAllocatedOut));r->m_channelsAllocatedOut = n;}// 前一個packet存在且不是完整的ChunkMsgHeader,所以有可能須要調整塊消息頭的類型;// fmt字節(jié);// case 0: chunk msg header 長度為11;// case 1: chunk msg header 長度為7;// case 2: chunk msg header 長度為3;// case 3: chunk msg header 長度為0;prevPacket = r->m_vecChannelsOut[packet->m_nChannel];if (prevPacket && packet->m_headerType != RTMP_PACKET_SIZE_LARGE){/* compress a bit by using the prev packet's attributes */// 獲取ChunkMsgHeader類型,前一個Chunk與當前Chunk比較;if (prevPacket->m_nBodySize == packet->m_nBodySize&& prevPacket->m_packetType == packet->m_packetType&& packet->m_headerType == RTMP_PACKET_SIZE_MEDIUM){// 若是先后兩個塊的大小、包類型都相同,則將塊頭類型fmt設為2; // 便可省略消息長度、消息類型id、消息流id; // 能夠參考官方協議:流的分塊 --- 6.1.2.3節(jié);packet->m_headerType = RTMP_PACKET_SIZE_SMALL;}if (prevPacket->m_nTimeStamp == packet->m_nTimeStamp&& packet->m_headerType == RTMP_PACKET_SIZE_SMALL){// 先后兩個塊的時間戳相同,且塊頭類型fmt為2,則相應的時間戳也可省略,所以將塊頭類型置為3;// 能夠參考官方協議:流的分塊 --- 6.1.2.4節(jié); packet->m_headerType = RTMP_PACKET_SIZE_MINIMUM;}last = prevPacket->m_nTimeStamp;}// 塊頭類型fmt取值0、一、二、3; 超過3就表示出錯(fmt占二個字節(jié));if (packet->m_headerType > 3) /* sanity */{RTMP_Log(RTMP_LOGERROR, "sanity failed!! trying to send header of type: 0x%02x.", (unsigned char)packet->m_headerType);return FALSE;}// 塊頭初始大小 = 基本頭(1字節(jié)) + 塊消息頭大小(11/7/3/0) = [12, 8, 4, 1]; // 塊基本頭是1-3字節(jié),所以用變量cSize來表示剩下的0-2字節(jié);// nSize 表示塊頭初始大小, hSize表示塊頭大小; nSize = packetSize[packet->m_headerType];hSize = nSize;cSize = 0;t = packet->m_nTimeStamp - last; // 時間戳增量;if (packet->m_body){// m_body是指向負載數據首地址的指針; "-"號用于指針前移;// header塊頭的首指針; hend塊頭的尾指針;header = packet->m_body - nSize;hend = packet->m_body;}else{header = hbuf + 6;hend = hbuf + sizeof(hbuf);}if (packet->m_nChannel > 319){// 塊流id(cs id)大于319,則塊基本頭占3個字節(jié);cSize = 2;}else if (packet->m_nChannel > 63){// 塊流id(cs id)在64與319之間,則塊基本頭占2個字節(jié);cSize = 1;}// ChunkBasicHeader的長度比初始長度還要長;if (cSize){// header指向塊頭;header -= cSize;// hSize加上ChunkBasicHeader的長度(比初始長度多出來的長度); hSize += cSize;}// nSize>1表示塊消息頭至少有3個字節(jié),即存在timestamp字段;// 相對TimeStamp大于0xffffff,此時須要使用ExtendTimeStamp;if (nSize > 1 && t >= 0xffffff){header -= 4;hSize += 4;}hptr = header;// 把ChunkBasicHeader的Fmt類型左移6位;c = packet->m_headerType << 6;// 設置basic header的第一個字節(jié)值,前兩位為fmt;// 能夠參考官方協議:流的分塊 --- 6.1.1節(jié);switch (cSize){case 0:{// 把ChunkBasicHeader的低6位設置成ChunkStreamID( cs id ) c |= packet->m_nChannel;}break;case 1:{// 同理 但低6位設置成000000;}break;case 2:{// 同理 但低6位設置成000001;c |= 1;}break;}// 能夠拆分紅兩句*hptr=c; hptr++;// 此時hptr指向第2個字節(jié);*hptr++ = c;// 設置basic header的第二(三)個字節(jié)值;if (cSize){// 將要放到第2字節(jié)的內容tmp;int tmp = packet->m_nChannel - 64;// 獲取低位存儲與第2字節(jié);*hptr++ = tmp & 0xff;if (cSize == 2){// ChunkBasicHeader是最大的3字節(jié)時,獲取高位存儲于最后1個字節(jié)(注意:排序使用大端序列,和主機相反);*hptr++ = tmp >> 8;}}// ChunkMsgHeader長度為十一、七、3 都含有timestamp(3字節(jié));if (nSize > 1){// 將時間戳(相對或絕對)轉化為3個字節(jié)存入hptr 若是時間戳超過0xffffff 則后面還要填入Extend Timestamp;hptr = AMF_EncodeInt24(hptr, hend, t > 0xffffff ? 0xffffff : t);}// ChunkMsgHeader長度為十一、7,都含有 msg length + msg type id;if (nSize > 4){// 將消息長度(msg length)轉化為3個字節(jié)存入hptr;hptr = AMF_EncodeInt24(hptr, hend, packet->m_nBodySize);*hptr++ = packet->m_packetType;}// ChunkMsgHeader長度為11 含有msg stream id(小端);if (nSize > 8){hptr += EncodeInt32LE(hptr, packet->m_nInfoField2);}if (nSize > 1 && t >= 0xffffff){hptr = AMF_EncodeInt32(hptr, hend, t);}// 到此為止 已經將塊頭填寫好了; // 此時nSize表示負載數據的長度 buffer是指向負載數據區(qū)的指針;nSize = packet->m_nBodySize;buffer = packet->m_body;nChunkSize = r->m_outChunkSize; //Chunk大小 默認是128字節(jié);RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d, size=%d", __FUNCTION__, r->m_sb.sb_socket, nSize);/* send all chunks in one HTTP request 使用HTTP協議 */if (r->Link.protocol & RTMP_FEATURE_HTTP){// nSize: Message負載長度; nChunkSize:Chunk長度; // 例nSize: 307 nChunkSize: 128 ; // 可分為(307 + 128 - 1)/128 = 3個;// 為何加 nChunkSize - 1? 由于除法會只取整數部分!;int chunks = (nSize+nChunkSize-1) / nChunkSize;// Chunk個數超過一個;if (chunks > 1){ // 注意: ChunkBasicHeader的長度 = cSize + 1;// 消息分n塊后總的開銷; // n個ChunkBasicHeader 1個ChunkMsgHeader 1個Message負載;// 實際上只有第一個Chunk是完整的,剩下的只有ChunkBasicHeader;tlen = chunks * (cSize + 1) + nSize + hSize;tbuf = malloc(tlen);if (!tbuf){return FALSE;}toff = tbuf;}}// 消息的負載 + 頭;while (nSize + hSize){int wrote;// 消息負載大小 < Chunk大小(不用分塊);if (nSize < nChunkSize){// Chunk可能小于設定值;nChunkSize = nSize; }RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)header, hSize);RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)buffer, nChunkSize);// 若是r->Link.protocol采用Http協議 則將RTMP包數據封裝成多個Chunk 而后一次性發(fā)送; // 不然每封裝成一個塊,就當即發(fā)送出去;if (tbuf){// 將從Chunk頭開始的nChunkSize + hSize個字節(jié)拷貝至toff中;// 這些拷貝的數據包括塊頭數據(hSize字節(jié))和nChunkSize個負載數據;memcpy(toff, header, nChunkSize + hSize);toff += nChunkSize + hSize;}else// 負載數據長度不超過設定的塊大小 不須要分塊; 所以tbuf為NULL或者r->Link.protocol不采用Http;{// 直接將負載數據和塊頭數據發(fā)送出去;wrote = WriteN(r, header, nChunkSize + hSize);if (!wrote){return FALSE;}}nSize -= nChunkSize; // 消息負載長度 - Chunk負載長度;buffer += nChunkSize; // buffer指針前移1個Chunk負載長度;hSize = 0;// 若是消息負載數據尚未發(fā)完 準備填充下一個塊的塊頭數據; if (nSize > 0){header = buffer - 1;hSize = 1;if (cSize){header -= cSize;hSize += cSize;}*header = (0xc0 | c);if (cSize){int tmp = packet->m_nChannel - 64;header[1] = tmp & 0xff;if (cSize == 2){header[2] = tmp >> 8;}}}}if (tbuf){int wrote = WriteN(r, tbuf, toff-tbuf);free(tbuf);tbuf = NULL;if (!wrote){return FALSE;}}/* we invoked a remote method */if (packet->m_packetType == RTMP_PACKET_TYPE_INVOKE){AVal method;char *ptr;ptr = packet->m_body + 1;AMF_DecodeString(ptr, &method);RTMP_Log(RTMP_LOGDEBUG, "Invoking %s", method.av_val);/* keep it in call queue till result arrives */if (queue){int txn;ptr += 3 + method.av_len;txn = (int)AMF_DecodeNumber(ptr);AV_queue(&r->m_methodCalls, &r->m_numCalls, &method, txn);}}if (!r->m_vecChannelsOut[packet->m_nChannel]){r->m_vecChannelsOut[packet->m_nChannel] = malloc(sizeof(RTMPPacket));}memcpy(r->m_vecChannelsOut[packet->m_nChannel], packet, sizeof(RTMPPacket));return TRUE; }

wireshark抓包結果:

源碼下載地址:?https://github.com/kingsunc/RTMP_Test

總結

以上是生活随笔為你收集整理的librtmp源码详解的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。