html
實時流協議(Real-TimeMessaging Protocol,RTMP)是用於互聯網上傳輸視音頻數據的網絡協議。本API提供了支持RTMP, RTMPT,RTMPE,RTMPS及以上幾種協議的變種(RTMPTE, RTMPTS)協議所需的大部分客戶端功能以及少許的服務器功能。儘管Adobe公司已經公佈了RTMP協議規範(RTMP specification),可是本工程並非經過Adobe的協議規範而是經過逆向工程的方式完成的。所以,它的運行方式可能和公佈的協議規範有所偏離,可是通常狀況下它和Adobe的客戶端的運行方式是如出一轍的。c++
本博客將對libRTMP的函數進行簡要說明。 這些函數能夠在 -lrtmp 庫中找到。其餘還有不少函數,可是尚未爲這些函數寫文檔。git
基本的操做以下文所述:github
RTMP_Alloc() :用於建立一個RTMP的結構體。web
RTMP_Init():初始化RTMP結構體。數組
RTMP_SetupURL():設置推拉流的URL。服務器
RTMP_EnableWrite(): 是否推流。網絡
RTMP_Connect():創建RTMP連接中的網絡鏈接(NetConnection)。app
RTMP_ConnectStream():創建RTMP連接中的網絡流(NetStream)。socket
RTMP_Read():讀取RTMP流的內容。
RTMP_Pause():流播放的時候能夠用於暫停和繼續。
RTMP_Seek():改變流播放的位置。
當RTMP_Read()返回0 字節的時候,表明流已經讀取完畢,然後能夠調用RTMP_Close()。
RTMP_Free():用於釋放RTMP結構體。
全部的數據都使用 FLV 格式進行傳輸。一個基本的會話須要一個RTMP URL。RTMP URL 格式以下所示:
rtmp[t][e|s]://hostname[:port][/app[/playpath]]
本流程主要爲推流示意圖,拉流與此大同小異。
源碼剖析的建議:
a. 先了解RTMP協議內容或者結合代碼一塊理解,詳情可查閱: http://www.cnblogs.com/Kingfans/p/7083100.html;
b. 先了解AMF編碼機制,詳情課查閱:http://www.cnblogs.com/Kingfans/p/7069542.html;
c. 發包過程當中請配合wireshark抓包工具一塊理解。
RTMP_Alloc()用於建立一個RTMP的結構體。
/************************************************************************************************************ * @brief 申請RTMP內存 * ************************************************************************************************************/ RTMP* RTMP_Alloc() { return calloc(1, sizeof(RTMP)); }
RTMP_Init():初始化RTMP結構體。
/************************************************************************************************************ * @brief 初始化RTMP * ************************************************************************************************************/ void RTMP_Init(RTMP *r) { #ifdef CRYPTO if (!RTMP_TLS_ctx) RTMP_TLS_Init(); #endif memset(r, 0, sizeof(RTMP)); r->m_sb.sb_socket = -1; r->m_inChunkSize = RTMP_DEFAULT_CHUNKSIZE; r->m_outChunkSize = RTMP_DEFAULT_CHUNKSIZE; r->m_bSendChunkSizeInfo = 1; r->m_nBufferMS = 30000; r->m_nClientBW = 2500000; r->m_nClientBW2 = 2; r->m_nServerBW = 2500000; r->m_fAudioCodecs = 3191.0; r->m_fVideoCodecs = 252.0; r->Link.timeout = 30; r->Link.swfAge = 30; }
RTMP_SetupURL():設置推拉流的URL。內部調用了RTMP_ParseURL主要用於解析url,詳見3.3.1。
/************************************************************************************************************ * @brief 設置推流: 地址url; * ************************************************************************************************************/ int RTMP_SetupURL(RTMP *r, char *url) { AVal opt, arg; char *p1, *p2, *ptr = strchr(url, ' '); int ret, len; unsigned int port = 0; if (ptr) { *ptr = '\0'; } len = (int)strlen(url); // 將url解析後設置到r; ret = RTMP_ParseURL(url, &r->Link.protocol, &r->Link.hostname, &port, &r->Link.playpath0, &r->Link.app); if (!ret) { return ret; } r->Link.port = port; r->Link.playpath = r->Link.playpath0; while (ptr) { *ptr++ = '\0'; p1 = ptr; p2 = strchr(p1, '='); if (!p2) { break; } opt.av_val = p1; opt.av_len = p2 - p1; *p2++ = '\0'; arg.av_val = p2; ptr = strchr(p2, ' '); if (ptr) { *ptr = '\0'; arg.av_len = ptr - p2; /* skip repeated spaces */ while (ptr[1] == ' ') { *ptr++ = '\0'; } } else { arg.av_len = (int)strlen(p2); } /* unescape */ port = arg.av_len; for (p1=p2; port >0;) { if (*p1 == '\\') { unsigned int c; if (port < 3) { return FALSE; } sscanf(p1+1, "%02x", &c); *p2++ = c; port -= 3; p1 += 3; } else { *p2++ = *p1++; port--; } } arg.av_len = p2 - arg.av_val; ret = RTMP_SetOpt(r, &opt, &arg); if (!ret) { return ret; } } if (!r->Link.tcUrl.av_len) { r->Link.tcUrl.av_val = url; if (r->Link.app.av_len) { if (r->Link.app.av_val < url + len) { /* if app is part of original url, just use it */ r->Link.tcUrl.av_len = r->Link.app.av_len + (r->Link.app.av_val - url); } else { len = r->Link.hostname.av_len + r->Link.app.av_len + sizeof("rtmpte://:65535/"); r->Link.tcUrl.av_val = malloc(len); r->Link.tcUrl.av_len = snprintf(r->Link.tcUrl.av_val, len, "%s://%.*s:%d/%.*s", RTMPProtocolStringsLower[r->Link.protocol], r->Link.hostname.av_len, r->Link.hostname.av_val, r->Link.port, r->Link.app.av_len, r->Link.app.av_val); r->Link.lFlags |= RTMP_LF_FTCU; } } else { r->Link.tcUrl.av_len = (int)strlen(url); } } #ifdef CRYPTO if ((r->Link.lFlags & RTMP_LF_SWFV) && r->Link.swfUrl.av_len) RTMP_HashSWF(r->Link.swfUrl.av_val, &r->Link.SWFSize, (unsigned char *)r->Link.SWFHash, r->Link.swfAge); #endif SocksSetup(r, &r->Link.sockshost); if (r->Link.port == 0) { if (r->Link.protocol & RTMP_FEATURE_SSL) { r->Link.port = 443; } else if (r->Link.protocol & RTMP_FEATURE_HTTP) { r->Link.port = 80; } else { r->Link.port = 1935; } } return TRUE; }
RTMP_ParseURL用於解析url,就是將url字符串內的協議類型、主機名稱、端口號、appname、playpath解析出來。
int RTMP_ParseURL(const char *url, int *protocol, AVal* host, unsigned int *port, AVal* playpath, AVal* app) { char *p, *end, *col, *ques, *slash; RTMP_Log(RTMP_LOGDEBUG, "RTMP_ParseURL"); *protocol = RTMP_PROTOCOL_RTMP; *port = 0; playpath->av_len = 0; playpath->av_val = NULL; app->av_len = 0; app->av_val = NULL; /* Old School Parsing */ /* look for usual :// pattern */ p = strstr(url, "://"); if(!p) { RTMP_Log(RTMP_LOGERROR, "RTMP URL: No :// in url!"); return FALSE; } { int len = (int)(p-url); if (len == 4 && strncasecmp(url, "rtmp", 4) == 0) { *protocol = RTMP_PROTOCOL_RTMP; } else if (len == 5 && strncasecmp(url, "rtmpt", 5) == 0) { *protocol = RTMP_PROTOCOL_RTMPT; } else if (len == 5 && strncasecmp(url, "rtmps", 5) == 0) { *protocol = RTMP_PROTOCOL_RTMPS; } else if (len == 5 && strncasecmp(url, "rtmpe", 5) == 0) { *protocol = RTMP_PROTOCOL_RTMPE; } else if (len == 5 && strncasecmp(url, "rtmfp", 5) == 0) { *protocol = RTMP_PROTOCOL_RTMFP; } else if (len == 6 && strncasecmp(url, "rtmpte", 6) == 0) { *protocol = RTMP_PROTOCOL_RTMPTE; } else if (len == 6 && strncasecmp(url, "rtmpts", 6) == 0) { *protocol = RTMP_PROTOCOL_RTMPTS; } else { RTMP_Log(RTMP_LOGWARNING, "Unknown protocol!\n"); goto parsehost; } } RTMP_Log(RTMP_LOGDEBUG, "Parsed protocol: %d", *protocol); parsehost: /* let's get the hostname */ p+=3; /* check for sudden death */ if(*p==0) { RTMP_Log(RTMP_LOGWARNING, "No hostname in URL!"); return FALSE; } end = p + strlen(p); col = strchr(p, ':'); ques = strchr(p, '?'); slash = strchr(p, '/'); { int hostlen; if (slash) { hostlen = slash - p; } else { hostlen = end - p; } if (col && col - p < hostlen) { hostlen = col - p; } if(hostlen < 256) { host->av_val = p; host->av_len = hostlen; RTMP_Log(RTMP_LOGDEBUG, "Parsed host : %.*s", hostlen, host->av_val); } else { RTMP_Log(RTMP_LOGWARNING, "Hostname exceeds 255 characters!"); } p+=hostlen; } /* get the port number if available */ if(*p == ':') { unsigned int p2; p++; p2 = atoi(p); if(p2 > 65535) { RTMP_Log(RTMP_LOGWARNING, "Invalid port number!"); } else { *port = p2; } } if(!slash) { RTMP_Log(RTMP_LOGWARNING, "No application or playpath in URL!"); return TRUE; } p = slash+1; { /* parse application * * rtmp://host[:port]/app[/appinstance][/...] * application = app[/appinstance] */ char *slash2, *slash3 = NULL, *slash4 = NULL; int applen, appnamelen; slash2 = strchr(p, '/'); if (slash2) { slash3 = strchr(slash2+1, '/'); } if (slash3) { slash4 = strchr(slash3+1, '/'); } applen = end-p; /* ondemand, pass all parameters as app */ appnamelen = applen; /* ondemand length */ if(ques && strstr(p, "slist=")) /* whatever it is, the '?' and slist= means we need to use everything as app and parse plapath from slist= */ { appnamelen = ques-p; } else if(strncmp(p, "ondemand/", 9)==0) { /* app = ondemand/foobar, only pass app=ondemand */ applen = 8; appnamelen = 8; } else /* app!=ondemand, so app is app[/appinstance] */ { if (slash4) { appnamelen = slash4-p; } else if (slash3) { appnamelen = slash3-p; } else if (slash2) { appnamelen = slash2-p; } applen = appnamelen; } app->av_val = p; app->av_len = applen; RTMP_Log(RTMP_LOGDEBUG, "Parsed app : %.*s", applen, p); p += appnamelen; } if (*p == '/') { p++; } if (end-p) { AVal av = {p, end-p}; RTMP_ParsePlaypath(&av, playpath); } return TRUE; }
RTMP_EnableWrite()設置爲推流狀態。
/************************************************************************************************************ * @brief 是否進行推流; * ************************************************************************************************************/ void RTMP_EnableWrite(RTMP *r) { r->Link.protocol |= RTMP_FEATURE_WRITE; }
RTMP_Connect():創建RTMP連接中的網絡鏈接(NetConnection)。主要分爲RTMP_Connect0(3.5.1)+ RTMP_Connect1(3.5.2)。
/************************************************************************************************************ * @brief 創建RTMP中的NetConnection * * @return 成功返回TRUE, 不然返回FALSE. ************************************************************************************************************/ int RTMP_Connect(RTMP *r, RTMPPacket *cp) { // Socket結構體; struct sockaddr_storage service; if (!r->Link.hostname.av_len) { return FALSE; } // COMODO security software sandbox blocks all DNS by returning "host not found" HOSTENT *h = gethostbyname("localhost"); if (!h && GetLastError() == WSAHOST_NOT_FOUND) { RTMP_Log(RTMP_LOGERROR, "RTMP_Connect: Connection test failed. This error is likely caused by Comodo Internet Security running OBS in sandbox mode. Please add OBS to the Comodo automatic sandbox exclusion list, restart OBS and try again (11001)."); return FALSE; } memset(&service, 0, sizeof(service)); if (r->Link.socksport) { // 加入地址信息, 使用SOCKS鏈接; if (!add_addr_info(&service, &r->Link.sockshost, r->Link.socksport)) { return FALSE; } } else { // 直接鏈接; if (!add_addr_info(&service, &r->Link.hostname, r->Link.port)) { return FALSE; } } RTMP_Log(RTMP_LOGDEBUG, "%s, 創建鏈接:第0次鏈接。開始創建Socket鏈接", __FUNCTION__); // RTMP_Connect0()主要用於創建Socket鏈接,並未開始真正的創建RTMP鏈接; if (!RTMP_Connect0(r, (struct sockaddr *)&service)) { RTMP_Log(RTMP_LOGDEBUG, "%s, 創建鏈接:第0次鏈接。創建Socket鏈接失敗", __FUNCTION__); return FALSE; } RTMP_Log(RTMP_LOGDEBUG, "%s, 創建鏈接:第0次鏈接。創建Socket鏈接成功", __FUNCTION__); r->m_bSendCounter = TRUE; // 真正創建RTMP鏈接的函數; return RTMP_Connect1(r, cp); }
RTMP_Connect0():第0次鏈接,創建socket鏈接。
/************************************************************************************************************ * @brief 第0次鏈接,創建Socket鏈接 * * @return 成功返回TRUE, 不然返回FALSE. ************************************************************************************************************/ int RTMP_Connect0(RTMP *r, struct sockaddr * service) { int on = 1; r->m_sb.sb_timedout = FALSE; r->m_pausing = 0; r->m_fDuration = 0.0; // 建立一個Socket,並把Socket序號賦值給相應變量; //r->m_sb.sb_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); r->m_sb.sb_socket = WSASocket(service->sa_family, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if (r->m_sb.sb_socket != -1) { if(r->m_bindIP.addrLen) { if (bind(r->m_sb.sb_socket, (const struct sockaddr *)&r->m_bindIP.addr, r->m_bindIP.addrLen) < 0) { int err = GetSockError(); RTMP_Log(RTMP_LOGERROR, "%s, failed to bind socket: %s (%d)", __FUNCTION__, socketerror(err), err); RTMP_Close(r); return FALSE; } } // 定義函數int connect (int sockfd, struct sockaddr* serv_addr, int addrlen); // 函數說明connect()用來將參數sockfd 的Socket連至參數serv_addr指定的網絡地址。參數addrlen爲sockaddr的結構長度。 // 鏈接; if (connect(r->m_sb.sb_socket, service, sizeof(struct sockaddr_storage)) < 0) { int err = GetSockError(); switch (err) { case 10061: { RTMP_Log(RTMP_LOGERROR, "%s is offline. Try a different server (10061).", r->Link.hostname.av_val); } break; case 10013: { RTMP_Log(RTMP_LOGERROR, "The connection is being blocked by a firewall or other security software (10013)."); } break; case 10060: { RTMP_Log(RTMP_LOGERROR, "The connection timed out. Try a different server, or check that the connection is not being blocked by a firewall or other security software (10060)."); } break; default: { RTMP_Log(RTMP_LOGERROR, "%s, failed to connect socket: %s (%d)", __FUNCTION__, socketerror(err), err); } break; } RTMP_Close(r); return FALSE; } // 指定了端口號 (注:這不是必需的); if (r->Link.socksport) { RTMP_Log(RTMP_LOGDEBUG, "%s ... SOCKS negotiation", __FUNCTION__); // 談判? 發送數據報以進行談判? ; if (!SocksNegotiate(r)) { RTMP_Log(RTMP_LOGERROR, "%s, SOCKS negotiation failed.", __FUNCTION__); RTMP_Close(r); return FALSE; } } } else { RTMP_Log(RTMP_LOGERROR, "%s, failed to create socket. Error: %d", __FUNCTION__, GetSockError()); return FALSE; } /* set timeout */ // 超時; { SET_RCVTIMEO(tv, r->Link.timeout); if (setsockopt (r->m_sb.sb_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv))) { RTMP_Log(RTMP_LOGERROR, "%s, Setting socket timeout to %ds failed!", __FUNCTION__, r->Link.timeout); } } if (!r->m_bUseNagle) { setsockopt(r->m_sb.sb_socket, IPPROTO_TCP, TCP_NODELAY, (char *)&on, sizeof(on)); } return TRUE; }
RTMP_Connect1():第1次鏈接,創建真正的rtmp鏈接。主要分爲HandShake握手(3.5.2.1)、SendConnectPacket發送命令請求(3.5.2.2)。
/************************************************************************************************************ * @brief 第1次鏈接,創建RTMP鏈接 * ************************************************************************************************************/ int RTMP_Connect1(RTMP *r, RTMPPacket *cp) { if (r->Link.protocol & RTMP_FEATURE_SSL) { #if defined(CRYPTO) && !defined(NO_SSL) TLS_client(RTMP_TLS_ctx, r->m_sb.sb_ssl); TLS_setfd(r->m_sb.sb_ssl, r->m_sb.sb_socket); if (TLS_connect(r->m_sb.sb_ssl) < 0) { RTMP_Log(RTMP_LOGERROR, "%s, TLS_Connect failed", __FUNCTION__); RTMP_Close(r); return FALSE; } #else RTMP_Log(RTMP_LOGERROR, "%s, no SSL/TLS support", __FUNCTION__); RTMP_Close(r); return FALSE; #endif } // 使用HTTP; if (r->Link.protocol & RTMP_FEATURE_HTTP) { r->m_msgCounter = 1; r->m_clientID.av_val = NULL; r->m_clientID.av_len = 0; HTTP_Post(r, RTMPT_OPEN, "", 1); if (HTTP_read(r, 1) != 0) { r->m_msgCounter = 0; RTMP_Log(RTMP_LOGDEBUG, "%s, Could not connect for handshake", __FUNCTION__); RTMP_Close(r); return 0; } r->m_msgCounter = 0; } RTMP_Log(RTMP_LOGDEBUG, "%s, ... connected, handshaking", __FUNCTION__); // 握手(C0+C1, S0+S1+S2, C1); if (!HandShake(r, TRUE)) { RTMP_Log(RTMP_LOGERROR, "%s, handshake failed.", __FUNCTION__); RTMP_Close(r); return FALSE; } RTMP_Log(RTMP_LOGDEBUG, "%s, handshaked", __FUNCTION__); if (!SendConnectPacket(r, cp)) { RTMP_Log(RTMP_LOGERROR, "%s, RTMP connect failed.", __FUNCTION__); RTMP_Close(r); return FALSE; } return TRUE; }
HandShake():握手過程實現(C0+C1, S0+S1+S2, C2), 具體查看RTMP協議規範。
/************************************************************************************************************ * @brief 客戶端握手過程; * * C0和C1放在一個buffer中發送出去,當客戶端收齊S0和S1數據後,開始發送C2; * 當客戶端收到S2後並驗證,則客戶端的握手完成; * 此處將就收到的S1原封不動的當成C2發送給服務器端; * * C0/S0 (1個字節) : 表示版本號; * C1/S1 (1536字節) : 格式爲 | time(4字節) | 0 0 0 0 (4字節) | 1528個隨機數 | * C2/S2 (1536字節) : 格式爲 | time(4字節)(對等端發送的時間) | time2 (4字節) | 1528個對等端隨機回覆 | * * @return 握手成功返回TRUE, 不然返回FALSE. ************************************************************************************************************/ static int HandShake(RTMP *r, int FP9HandShake) { int i = 0; uint32_t uptime = 0, suptime = 0; // 客戶端和服務器端時間; int bMatch = 0; char type; char strC[RTMP_SIG_SIZE + 1]; // C0+C1; char* strC1 = strC + 1; // C1; char strC2[RTMP_SIG_SIZE]; // C2; char strS2[RTMP_SIG_SIZE]; // S2 // RTMP協議版本號爲0x03, 即C0數據; strC[0] = 0x03; // not encrypted: 沒有加密; // 獲取系統時間(毫秒爲單位),將其寫入到C1中,佔4個字節; uptime = htonl(RTMP_GetTime()); memcpy(strC1, &uptime, 4); // 上次對方返回請求的時間(毫秒爲單位),將其寫入到C1中; memset(&strC1[4], 0, 4); #ifdef _DEBUG // debug版,後面的1528個隨機數簡單的都設爲0xff; for (i = 8; i < RTMP_SIG_SIZE; i++) { strC1[i] = 0xff; } #else // release版,使用rand()循環生成1528個僞隨機數; for (i = 8; i < RTMP_SIG_SIZE; i++) { strC1[i] = (char)(rand() % 256); } #endif // 發送握手數據C0和C1 if (!WriteN(r, strC, RTMP_SIG_SIZE + 1)) { return FALSE; } // 讀取數據報,長度爲1,存入type中; // 此處讀取的是服務器端發送來的S0,表示服務器使用的Rtmp版本; if (ReadN(r, &type, 1) != 1) { /* 0x03 or 0x06 (03是明文,06是加密,其餘值非法)*/ return FALSE; } RTMP_Log(RTMP_LOGDEBUG, "%s: Type Answer : %02X", __FUNCTION__, type); // 客戶端要求的版本與服務器端提供的版本不同; if (type != strC[0]) { RTMP_Log(RTMP_LOGWARNING, "%s: Type mismatch: client sent %d, server answered %d", __FUNCTION__, strC[0], type); } // 讀取服務器端發送過來的S1數據賦值給C2,並判斷隨機序列長度是否相同; if (ReadN(r, strC2, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) { return FALSE; } /* decode server response */ // 把serversig的前4個字節賦值給suptime; // S1中的time與C2中的time應該相同; memcpy(&suptime, strC2, 4); // 將網絡字節序轉化爲主機字節序,即大端轉小端; suptime = ntohl(suptime); RTMP_Log(RTMP_LOGDEBUG, "%s: Server Uptime : %d", __FUNCTION__, suptime); RTMP_Log(RTMP_LOGDEBUG, "%s: FMS Version : %d.%d.%d.%d", __FUNCTION__, strC2[4], strC2[5], strC2[6], strC2[7]); /* 2nd part of handshake */ // 發送握手數據C2(1536個字節)給服務器; if (!WriteN(r, strC2, RTMP_SIG_SIZE)) { return FALSE; } // 讀取從服務器發送過來的握手數據S2(1536個字節); if (ReadN(r, strS2, RTMP_SIG_SIZE) != RTMP_SIG_SIZE) { return FALSE; } // 比較客戶端C1和服務器端S2的1536個數是否匹配; bMatch = (memcmp(strS2, strC1, RTMP_SIG_SIZE) == 0); if (!bMatch) { RTMP_Log(RTMP_LOGWARNING, "%s, client signature does not match!", __FUNCTION__); } return TRUE; }
wireshark抓包結果:
SendConnectPacket()發送命令請求: set chunk size 和 connect(stream id)。
/************************************************************************************************************ * @brief 發送connect命令; * 這是每次程序運行的時候發送的第一個命令消息; * 命令消息由命令名,傳輸ID,和命令對象組成; * 命令對象由一系列的相關參數組成; * 可參考rtmp協議:rtmp命令消息--4.1.1節; * * set chunk size + connect(stream id) ************************************************************************************************************/ static int SendConnectPacket(RTMP *r, RTMPPacket *cp) { RTMPPacket packet; char pbuf[4096], *pend = pbuf + sizeof(pbuf); char *enc; if (cp) { return RTMP_SendPacket(r, cp, TRUE); } if((r->Link.protocol & RTMP_FEATURE_WRITE) && r->m_bSendChunkSizeInfo) { packet.m_nChannel = 0x02; packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = RTMP_PACKET_TYPE_CHUNK_SIZE; packet.m_nTimeStamp = 0; packet.m_nInfoField2 = 0; packet.m_hasAbsTimestamp = 0; packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE; packet.m_nBodySize = 4; enc = packet.m_body; AMF_EncodeInt32(enc, pend, r->m_outChunkSize); // 發送 set chunk size; if (!RTMP_SendPacket(r, &packet, FALSE)) { return 0; } } packet.m_nChannel = 0x03; /* control channel (invoke) */ packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = RTMP_PACKET_TYPE_INVOKE; packet.m_nTimeStamp = 0; packet.m_nInfoField2 = 0; packet.m_hasAbsTimestamp = 0; packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE; enc = packet.m_body; enc = AMF_EncodeString(enc, pend, &av_connect); enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes); *enc++ = AMF_OBJECT; enc = AMF_EncodeNamedString(enc, pend, &av_app, &r->Link.app); if (!enc) { return FALSE; } if (r->Link.protocol & RTMP_FEATURE_WRITE) { enc = AMF_EncodeNamedString(enc, pend, &av_type, &av_nonprivate); if (!enc) { return FALSE; } } if (r->Link.flashVer.av_len) { enc = AMF_EncodeNamedString(enc, pend, &av_flashVer, &r->Link.flashVer); if (!enc) { return FALSE; } } if (r->Link.swfUrl.av_len) { enc = AMF_EncodeNamedString(enc, pend, &av_swfUrl, &r->Link.swfUrl); if (!enc) { return FALSE; } } if (r->Link.tcUrl.av_len) { enc = AMF_EncodeNamedString(enc, pend, &av_tcUrl, &r->Link.tcUrl); if (!enc) { return FALSE; } } if (!(r->Link.protocol & RTMP_FEATURE_WRITE)) { enc = AMF_EncodeNamedBoolean(enc, pend, &av_fpad, FALSE); if (!enc) { return FALSE; } enc = AMF_EncodeNamedNumber(enc, pend, &av_capabilities, 15.0); if (!enc) { return FALSE; } enc = AMF_EncodeNamedNumber(enc, pend, &av_audioCodecs, r->m_fAudioCodecs); if (!enc) { return FALSE; } enc = AMF_EncodeNamedNumber(enc, pend, &av_videoCodecs, r->m_fVideoCodecs); if (!enc) { return FALSE; } enc = AMF_EncodeNamedNumber(enc, pend, &av_videoFunction, 1.0); if (!enc) { return FALSE; } if (r->Link.pageUrl.av_len) { enc = AMF_EncodeNamedString(enc, pend, &av_pageUrl, &r->Link.pageUrl); if (!enc) { return FALSE; } } } if (r->m_fEncoding != 0.0 || r->m_bSendEncoding) { /* AMF0, AMF3 not fully supported yet */ enc = AMF_EncodeNamedNumber(enc, pend, &av_objectEncoding, r->m_fEncoding); if (!enc) { return FALSE; } } if (enc + 3 >= pend) { return FALSE; } *enc++ = 0; *enc++ = 0; /* end of object - 0x00 0x00 0x09 */ *enc++ = AMF_OBJECT_END; /* add auth string */ if (r->Link.auth.av_len) { enc = AMF_EncodeBoolean(enc, pend, r->Link.lFlags & RTMP_LF_AUTH); if (!enc) { return FALSE; } enc = AMF_EncodeString(enc, pend, &r->Link.auth); if (!enc) { return FALSE; } } if (r->Link.extras.o_num) { int i; for (i = 0; i < r->Link.extras.o_num; i++) { enc = AMFProp_Encode(&r->Link.extras.o_props[i], enc, pend); if (!enc) { return FALSE; } } } packet.m_nBodySize = enc - packet.m_body; // 發送 connect(streamid); return RTMP_SendPacket(r, &packet, TRUE); }
wireshark抓包結果:
RTMP_ConnectStream(): 創建RTMP中的NetStream。此處while循環接收消息並調用RTMP_ClientPacket進行處理。
/************************************************************************************************************ * @brief 創建RTMP中的NetStream * * @return 成功返回TRUE, 不然返回FALSE. ************************************************************************************************************/ int RTMP_ConnectStream(RTMP *r, int seekTime) { RTMPPacket packet = { 0 }; /* seekTime was already set by SetupStream / SetupURL. * This is only needed by ReconnectStream. */ if (seekTime > 0) { r->Link.seekTime = seekTime; } r->m_mediaChannel = 0; while (!r->m_bPlaying && RTMP_IsConnected(r) && RTMP_ReadPacket(r, &packet)) { if (RTMPPacket_IsReady(&packet)) { if (!packet.m_nBodySize) { continue; } if ((packet.m_packetType == RTMP_PACKET_TYPE_AUDIO) || (packet.m_packetType == RTMP_PACKET_TYPE_VIDEO) || (packet.m_packetType == RTMP_PACKET_TYPE_INFO)) { RTMP_Log(RTMP_LOGWARNING, "Received FLV packet before play()! Ignoring."); RTMPPacket_Free(&packet); continue; } RTMP_ClientPacket(r, &packet); RTMPPacket_Free(&packet); } } return r->m_bPlaying; }
RTMP_ClientPacket(): 處理收到的消息。
/************************************************************************************************************ * @brief 處理接收到的Chunk; * * @return 有數據返回TRUE, 無返回FALSE. ************************************************************************************************************/ int RTMP_ClientPacket(RTMP *r, RTMPPacket *packet) { int bHasMediaPacket = 0; switch (packet->m_packetType) { // RTMP消息類型ID=1 設置塊大小; case RTMP_PACKET_TYPE_CHUNK_SIZE: { /* chunk size */ RTMP_Log(RTMP_LOGDEBUG, "處理消息: chunk_size (m_packetType=1)\n"); HandleChangeChunkSize(r, packet); } break; // RTMP消息類型ID=3 致謝; case RTMP_PACKET_TYPE_BYTES_READ_REPORT: { /* bytes read report */ RTMP_Log(RTMP_LOGDEBUG, "處理消息: bytes_read_report (m_packetType=3)\n "); } break; // RTMP消息類型ID=4 用戶控制; case RTMP_PACKET_TYPE_CONTROL: { /* ctrl */ RTMP_Log(RTMP_LOGDEBUG, "處理消息: control (m_packetType=4)\n "); HandleCtrl(r, packet); } break; // RTMP消息類型ID=5 ; case RTMP_PACKET_TYPE_SERVER_BW: { /* server bw */ RTMP_Log(RTMP_LOGDEBUG, "處理消息: server_bw (m_packetType=5)\n "); HandleServerBW(r, packet); } break; // RTMP消息類型ID=6; case RTMP_PACKET_TYPE_CLIENT_BW: { /* client bw */ RTMP_Log(RTMP_LOGDEBUG, "處理消息: client_bw (m_packetType=6)\n "); HandleClientBW(r, packet); } break; // RTMP消息類型ID=8 音頻數據; case RTMP_PACKET_TYPE_AUDIO: { /* audio data */ RTMP_Log(RTMP_LOGDEBUG, "處理消息: audio (m_packetType=8)\n "); HandleAudio(r, packet); bHasMediaPacket = 1; if (!r->m_mediaChannel) { r->m_mediaChannel = packet->m_nChannel; } if (!r->m_pausing) { r->m_mediaStamp = packet->m_nTimeStamp; } } break; // RTMP消息類型ID=9 視頻數據; case RTMP_PACKET_TYPE_VIDEO: { /* video data */ RTMP_Log(RTMP_LOGDEBUG, "處理消息: video (m_packetType=9)\n "); HandleVideo(r, packet); bHasMediaPacket = 1; if (!r->m_mediaChannel) { r->m_mediaChannel = packet->m_nChannel; } if (!r->m_pausing) { r->m_mediaStamp = packet->m_nTimeStamp; } } break; // RTMP消息類型ID=15 AMF3編碼 忽略; case RTMP_PACKET_TYPE_FLEX_STREAM_SEND: { /* flex stream send */ RTMP_Log(RTMP_LOGDEBUG, "%s, flex stream send, size %u bytes, not supported, ignoring", __FUNCTION__, packet->m_nBodySize); } break; // RTMP消息類型ID=16 AMF3編碼 忽略; case RTMP_PACKET_TYPE_FLEX_SHARED_OBJECT: { /* flex shared object */ RTMP_Log(RTMP_LOGDEBUG, "%s, flex shared object, size %u bytes, not supported, ignoring", __FUNCTION__, packet->m_nBodySize); } break; // RTMP消息類型ID=17 AMF3編碼 忽略 case RTMP_PACKET_TYPE_FLEX_MESSAGE: { /* flex message */ RTMP_Log(RTMP_LOGDEBUG, "%s, flex message, size %u bytes, not fully supported", __FUNCTION__, packet->m_nBodySize); /*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */ /* some DEBUG code */ #if 0 RTMP_LIB_AMFObject obj; int nRes = obj.Decode(packet.m_body+1, packet.m_nBodySize-1); if(nRes < 0) { RTMP_Log(RTMP_LOGERROR, "%s, error decoding AMF3 packet", __FUNCTION__); /*return; */ } obj.Dump(); #endif if (HandleInvoke(r, packet->m_body + 1, packet->m_nBodySize - 1) == 1) { bHasMediaPacket = 2; } } break; // RTMP消息類型ID=18 AMF0編碼 數據消息; case RTMP_PACKET_TYPE_INFO: { /* metadata (notify) */ RTMP_Log(RTMP_LOGDEBUG, "%s, received: notify %u bytes", __FUNCTION__, packet->m_nBodySize); // 處理元數據; if (HandleMetadata(r, packet->m_body, packet->m_nBodySize)) { bHasMediaPacket = 1; } } break; // RTMP消息類型ID=19 AMF0編碼,忽略; case RTMP_PACKET_TYPE_SHARED_OBJECT: { RTMP_Log(RTMP_LOGDEBUG, "%s, shared object, not supported, ignoring", __FUNCTION__); } break; // RTMP消息類型ID=20 AMF0編碼,命令消息 case RTMP_PACKET_TYPE_INVOKE: { /* invoke */ RTMP_Log(RTMP_LOGDEBUG, "%s, received: invoke %u bytes", __FUNCTION__, packet->m_nBodySize); /*RTMP_LogHex(packet.m_body, packet.m_nBodySize); */ if (HandleInvoke(r, packet->m_body, packet->m_nBodySize) == 1) { bHasMediaPacket = 2; } } break; // RTMP消息類型ID=22 case RTMP_PACKET_TYPE_FLASH_VIDEO: { /* go through FLV packets and handle metadata packets */ unsigned int pos = 0; uint32_t nTimeStamp = packet->m_nTimeStamp; while (pos + 11 < packet->m_nBodySize) { uint32_t dataSize = AMF_DecodeInt24(packet->m_body + pos + 1); /* size without header (11) and prevTagSize (4) */ if (pos + 11 + dataSize + 4 > packet->m_nBodySize) { RTMP_Log(RTMP_LOGWARNING, "Stream corrupt?!"); break; } if (packet->m_body[pos] == 0x12) { HandleMetadata(r, packet->m_body + pos + 11, dataSize); } else if (packet->m_body[pos] == 8 || packet->m_body[pos] == 9) { nTimeStamp = AMF_DecodeInt24(packet->m_body + pos + 4); nTimeStamp |= (packet->m_body[pos + 7] << 24); } pos += (11 + dataSize + 4); } if (!r->m_pausing) r->m_mediaStamp = nTimeStamp; /* FLV tag(s) */ /*RTMP_Log(RTMP_LOGDEBUG, "%s, received: FLV tag(s) %lu bytes", __FUNCTION__, packet.m_nBodySize); */ bHasMediaPacket = 1; } break; default: { RTMP_Log(RTMP_LOGDEBUG, "%s, unknown packet type received: 0x%02x", __FUNCTION__, packet->m_packetType); #ifdef _DEBUG RTMP_LogHex(RTMP_LOGDEBUG, packet->m_body, packet->m_nBodySize); #endif } break; } return bHasMediaPacket; }
wireshark抓包結果:
RTMP_SendPacket(): 消息流發送,內含分塊處理,具體可結合RTMP協議規範。
/************************************************************************************************************ * @brief 消息流發送: 包括分塊發送實現; * ************************************************************************************************************/ int RTMP_SendPacket(RTMP *r, RTMPPacket *packet, int queue) { const RTMPPacket *prevPacket; uint32_t last = 0; int nSize; int hSize, cSize; char *header, *hptr, *hend, hbuf[RTMP_MAX_HEADER_SIZE], c; uint32_t t; char *buffer, *tbuf = NULL, *toff = NULL; int nChunkSize; int tlen; if (packet->m_nChannel >= r->m_channelsAllocatedOut) { int n = packet->m_nChannel + 10; RTMPPacket **packets = realloc(r->m_vecChannelsOut, sizeof(RTMPPacket*) * n); if (!packets) { free(r->m_vecChannelsOut); r->m_vecChannelsOut = NULL; r->m_channelsAllocatedOut = 0; return FALSE; } r->m_vecChannelsOut = packets; memset(r->m_vecChannelsOut + r->m_channelsAllocatedOut, 0, sizeof(RTMPPacket*) * (n - r->m_channelsAllocatedOut)); r->m_channelsAllocatedOut = n; } // 前一個packet存在且不是完整的ChunkMsgHeader,所以有可能須要調整塊消息頭的類型; // fmt字節; // case 0: chunk msg header 長度爲11; // case 1: chunk msg header 長度爲7; // case 2: chunk msg header 長度爲3; // case 3: chunk msg header 長度爲0; prevPacket = r->m_vecChannelsOut[packet->m_nChannel]; if (prevPacket && packet->m_headerType != RTMP_PACKET_SIZE_LARGE) { /* compress a bit by using the prev packet's attributes */ // 獲取ChunkMsgHeader類型,前一個Chunk與當前Chunk比較; if (prevPacket->m_nBodySize == packet->m_nBodySize && prevPacket->m_packetType == packet->m_packetType && packet->m_headerType == RTMP_PACKET_SIZE_MEDIUM) { // 若是先後兩個塊的大小、包類型都相同,則將塊頭類型fmt設爲2; // 便可省略消息長度、消息類型id、消息流id; // 能夠參考官方協議:流的分塊 --- 6.1.2.3節; packet->m_headerType = RTMP_PACKET_SIZE_SMALL; } if (prevPacket->m_nTimeStamp == packet->m_nTimeStamp && packet->m_headerType == RTMP_PACKET_SIZE_SMALL) { // 先後兩個塊的時間戳相同,且塊頭類型fmt爲2,則相應的時間戳也可省略,所以將塊頭類型置爲3; // 能夠參考官方協議:流的分塊 --- 6.1.2.4節; packet->m_headerType = RTMP_PACKET_SIZE_MINIMUM; } last = prevPacket->m_nTimeStamp; } // 塊頭類型fmt取值0、一、二、3; 超過3就表示出錯(fmt佔二個字節); if (packet->m_headerType > 3) /* sanity */ { RTMP_Log(RTMP_LOGERROR, "sanity failed!! trying to send header of type: 0x%02x.", (unsigned char)packet->m_headerType); return FALSE; } // 塊頭初始大小 = 基本頭(1字節) + 塊消息頭大小(11/7/3/0) = [12, 8, 4, 1]; // 塊基本頭是1-3字節,所以用變量cSize來表示剩下的0-2字節; // nSize 表示塊頭初始大小, hSize表示塊頭大小; nSize = packetSize[packet->m_headerType]; hSize = nSize; cSize = 0; t = packet->m_nTimeStamp - last; // 時間戳增量; if (packet->m_body) { // m_body是指向負載數據首地址的指針; "-"號用於指針前移; // header塊頭的首指針; hend塊頭的尾指針; header = packet->m_body - nSize; hend = packet->m_body; } else { header = hbuf + 6; hend = hbuf + sizeof(hbuf); } if (packet->m_nChannel > 319) { // 塊流id(cs id)大於319,則塊基本頭佔3個字節; cSize = 2; } else if (packet->m_nChannel > 63) { // 塊流id(cs id)在64與319之間,則塊基本頭佔2個字節; cSize = 1; } // ChunkBasicHeader的長度比初始長度還要長; if (cSize) { // header指向塊頭; header -= cSize; // hSize加上ChunkBasicHeader的長度(比初始長度多出來的長度); hSize += cSize; } // nSize>1表示塊消息頭至少有3個字節,即存在timestamp字段; // 相對TimeStamp大於0xffffff,此時須要使用ExtendTimeStamp; if (nSize > 1 && t >= 0xffffff) { header -= 4; hSize += 4; } hptr = header; // 把ChunkBasicHeader的Fmt類型左移6位; c = packet->m_headerType << 6; // 設置basic header的第一個字節值,前兩位爲fmt; // 能夠參考官方協議:流的分塊 --- 6.1.1節; switch (cSize) { case 0: { // 把ChunkBasicHeader的低6位設置成ChunkStreamID( cs id ) c |= packet->m_nChannel; } break; case 1: { // 同理 但低6位設置成000000; } break; case 2: { // 同理 但低6位設置成000001; c |= 1; } break; } // 能夠拆分紅兩句*hptr=c; hptr++; // 此時hptr指向第2個字節; *hptr++ = c; // 設置basic header的第二(三)個字節值; if (cSize) { // 將要放到第2字節的內容tmp; int tmp = packet->m_nChannel - 64; // 獲取低位存儲與第2字節; *hptr++ = tmp & 0xff; if (cSize == 2) { // ChunkBasicHeader是最大的3字節時,獲取高位存儲於最後1個字節(注意:排序使用大端序列,和主機相反); *hptr++ = tmp >> 8; } } // ChunkMsgHeader長度爲十一、七、3 都含有timestamp(3字節); if (nSize > 1) { // 將時間戳(相對或絕對)轉化爲3個字節存入hptr 若是時間戳超過0xffffff 則後面還要填入Extend Timestamp; hptr = AMF_EncodeInt24(hptr, hend, t > 0xffffff ? 0xffffff : t); } // ChunkMsgHeader長度爲十一、7,都含有 msg length + msg type id; if (nSize > 4) { // 將消息長度(msg length)轉化爲3個字節存入hptr; hptr = AMF_EncodeInt24(hptr, hend, packet->m_nBodySize); *hptr++ = packet->m_packetType; } // ChunkMsgHeader長度爲11 含有msg stream id(小端); if (nSize > 8) { hptr += EncodeInt32LE(hptr, packet->m_nInfoField2); } if (nSize > 1 && t >= 0xffffff) { hptr = AMF_EncodeInt32(hptr, hend, t); } // 到此爲止 已經將塊頭填寫好了; // 此時nSize表示負載數據的長度 buffer是指向負載數據區的指針; nSize = packet->m_nBodySize; buffer = packet->m_body; nChunkSize = r->m_outChunkSize; //Chunk大小 默認是128字節; RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d, size=%d", __FUNCTION__, r->m_sb.sb_socket, nSize); /* send all chunks in one HTTP request 使用HTTP協議 */ if (r->Link.protocol & RTMP_FEATURE_HTTP) { // nSize: Message負載長度; nChunkSize:Chunk長度; // 例nSize: 307 nChunkSize: 128 ; // 可分爲(307 + 128 - 1)/128 = 3個; // 爲何加 nChunkSize - 1? 由於除法會只取整數部分!; int chunks = (nSize+nChunkSize-1) / nChunkSize; // Chunk個數超過一個; if (chunks > 1) { // 注意: ChunkBasicHeader的長度 = cSize + 1; // 消息分n塊後總的開銷; // n個ChunkBasicHeader 1個ChunkMsgHeader 1個Message負載; // 實際上只有第一個Chunk是完整的,剩下的只有ChunkBasicHeader; tlen = chunks * (cSize + 1) + nSize + hSize; tbuf = malloc(tlen); if (!tbuf) { return FALSE; } toff = tbuf; } } // 消息的負載 + 頭; while (nSize + hSize) { int wrote; // 消息負載大小 < Chunk大小(不用分塊); if (nSize < nChunkSize) { // Chunk可能小於設定值; nChunkSize = nSize; } RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)header, hSize); RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)buffer, nChunkSize); // 若是r->Link.protocol採用Http協議 則將RTMP包數據封裝成多個Chunk 而後一次性發送; // 不然每封裝成一個塊,就當即發送出去; if (tbuf) { // 將從Chunk頭開始的nChunkSize + hSize個字節拷貝至toff中; // 這些拷貝的數據包括塊頭數據(hSize字節)和nChunkSize個負載數據; memcpy(toff, header, nChunkSize + hSize); toff += nChunkSize + hSize; } else // 負載數據長度不超過設定的塊大小 不須要分塊; 所以tbuf爲NULL或者r->Link.protocol不採用Http; { // 直接將負載數據和塊頭數據發送出去; wrote = WriteN(r, header, nChunkSize + hSize); if (!wrote) { return FALSE; } } nSize -= nChunkSize; // 消息負載長度 - Chunk負載長度; buffer += nChunkSize; // buffer指針前移1個Chunk負載長度; hSize = 0; // 若是消息負載數據尚未發完 準備填充下一個塊的塊頭數據; if (nSize > 0) { header = buffer - 1; hSize = 1; if (cSize) { header -= cSize; hSize += cSize; } *header = (0xc0 | c); if (cSize) { int tmp = packet->m_nChannel - 64; header[1] = tmp & 0xff; if (cSize == 2) { header[2] = tmp >> 8; } } } } if (tbuf) { int wrote = WriteN(r, tbuf, toff-tbuf); free(tbuf); tbuf = NULL; if (!wrote) { return FALSE; } } /* we invoked a remote method */ if (packet->m_packetType == RTMP_PACKET_TYPE_INVOKE) { AVal method; char *ptr; ptr = packet->m_body + 1; AMF_DecodeString(ptr, &method); RTMP_Log(RTMP_LOGDEBUG, "Invoking %s", method.av_val); /* keep it in call queue till result arrives */ if (queue) { int txn; ptr += 3 + method.av_len; txn = (int)AMF_DecodeNumber(ptr); AV_queue(&r->m_methodCalls, &r->m_numCalls, &method, txn); } } if (!r->m_vecChannelsOut[packet->m_nChannel]) { r->m_vecChannelsOut[packet->m_nChannel] = malloc(sizeof(RTMPPacket)); } memcpy(r->m_vecChannelsOut[packet->m_nChannel], packet, sizeof(RTMPPacket)); return TRUE; }
wireshark抓包結果: