欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

RTMPdump(libRTMP) 源代码分析 9: 接收消息(Message)(接收视音频数据)

发布时间:2025/4/16 44 豆豆
生活随笔 收集整理的这篇文章主要介绍了 RTMPdump(libRTMP) 源代码分析 9: 接收消息(Message)(接收视音频数据) 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

2019独角兽企业重金招聘Python工程师标准>>>

注:此前写了一些列的分析RTMPdump(libRTMP)源代码的文章,在此列一个列表:
RTMPdump 源代码分析 1: main()函数
RTMPDump(libRTMP)源代码分析 2:解析RTMP地址——RTMP_ParseURL()
RTMPdump(libRTMP) 源代码分析 3: AMF编码
RTMPdump(libRTMP)源代码分析 4: 连接第一步——握手(Hand Shake)
RTMPdump(libRTMP) 源代码分析 5: 建立一个流媒体连接 (NetConnection部分)
RTMPdump(libRTMP) 源代码分析 6: 建立一个流媒体连接 (NetStream部分 1)
RTMPdump(libRTMP) 源代码分析 7: 建立一个流媒体连接 (NetStream部分 2)
RTMPdump(libRTMP) 源代码分析 8: 发送消息(Message)
RTMPdump(libRTMP) 源代码分析 9: 接收消息(Message)(接收视音频数据)
RTMPdump(libRTMP) 源代码分析 10: 处理各种消息(Message)

===============================


前一篇文章分析了RTMPdump(libRTMP) 的发送消息(Message)方面的源代码:RTMPdump(libRTMP) 源代码分析 8: 发送消息(Message)

在这里在研究研究接收消息(Message)的源代码,接收消息最典型的应用就是接收视音频数据了,因为视频和音频分别都属于RTMP协议规范中的一种消息。在这里主要分析接收视音频数据。

RTMPdump中完成视音频数据的接收(也可以说是视音频数据的下载)的函数是:RTMP_Read()。

RTMPdump主程序中的Download()函数就是通过调用RTMP_Read()完成数据接收,从而实现下载的。

那么我们马上开始吧,首先看看RTMP_Read()函数:

//FLV文件头 static const char flvHeader[] = { 'F', 'L', 'V', 0x01,0x00, /* 0x04代表有音频, 0x01代表有视频 */0x00, 0x00, 0x00, 0x09,0x00, 0x00, 0x00, 0x00 };#define HEADERBUF (128*1024) int RTMP_Read(RTMP *r, char *buf, int size) {int nRead = 0, total = 0;/* can't continue */ fail:switch (r->m_read.status) {case RTMP_READ_EOF:case RTMP_READ_COMPLETE:return 0;case RTMP_READ_ERROR:  /* corrupted stream, resume failed */SetSockError(EINVAL);return -1;default:break;}/* first time thru */if (!(r->m_read.flags & RTMP_READ_HEADER)){if (!(r->m_read.flags & RTMP_READ_RESUME)){//分配内存,指向buf的首部和尾部char *mybuf = (char *) malloc(HEADERBUF), *end = mybuf + HEADERBUF;int cnt = 0;//buf指向同一地址r->m_read.buf = mybuf;r->m_read.buflen = HEADERBUF;//把Flv的首部复制到mybuf指向的内存//RTMP传递的多媒体数据是“砍头”的FLV文件memcpy(mybuf, flvHeader, sizeof(flvHeader));//m_read.buf指针后移flvheader个单位r->m_read.buf += sizeof(flvHeader);//buf长度增加flvheader长度r->m_read.buflen -= sizeof(flvHeader);//timestamp=0,不是多媒体数据while (r->m_read.timestamp == 0){//读取一个Packet,到r->m_read.buf//nRead为读取结果标记nRead = Read_1_Packet(r, r->m_read.buf, r->m_read.buflen);//有错误if (nRead < 0){free(mybuf);r->m_read.buf = NULL;r->m_read.buflen = 0;r->m_read.status = nRead;goto fail;}/* buffer overflow, fix buffer and give up */if (r->m_read.buf < mybuf || r->m_read.buf > end) {mybuf = (char *) realloc(mybuf, cnt + nRead);memcpy(mybuf+cnt, r->m_read.buf, nRead);r->m_read.buf = mybuf+cnt+nRead;break;}////记录读取的字节数cnt += nRead;//m_read.buf指针后移nRead个单位r->m_read.buf += nRead;r->m_read.buflen -= nRead;//当dataType=00000101时,即有视频和音频时//说明有多媒体数据了if (r->m_read.dataType == 5)break;}//读入数据类型//注意:mybuf指针位置一直没动//mybuf[4]中第 6 位表示是否存在音频Tag。第 8 位表示是否存在视频Tag。 mybuf[4] = r->m_read.dataType;//两个指针之间的差r->m_read.buflen = r->m_read.buf - mybuf;r->m_read.buf = mybuf;//这句很重要!后面memcopyr->m_read.bufpos = mybuf;}//flags标明已经读完了文件头r->m_read.flags |= RTMP_READ_HEADER;}if ((r->m_read.flags & RTMP_READ_SEEKING) && r->m_read.buf){/* drop whatever's here */free(r->m_read.buf);r->m_read.buf = NULL;r->m_read.bufpos = NULL;r->m_read.buflen = 0;}/* If there's leftover data buffered, use it up */if (r->m_read.buf){nRead = r->m_read.buflen;if (nRead > size)nRead = size;//m_read.bufpos指向mybufmemcpy(buf, r->m_read.bufpos, nRead);r->m_read.buflen -= nRead;if (!r->m_read.buflen){free(r->m_read.buf);r->m_read.buf = NULL;r->m_read.bufpos = NULL;}else{r->m_read.bufpos += nRead;}buf += nRead;total += nRead;size -= nRead;}//接着读while (size > 0 && (nRead = Read_1_Packet(r, buf, size)) >= 0){if (!nRead) continue;buf += nRead;total += nRead;size -= nRead;break;}if (nRead < 0)r->m_read.status = nRead;if (size < 0)total += size;return total; }


程序关键的地方都已经注释上了代码,在此就不重复说明了。有一点要提一下:RTMP传送的视音频数据的格式和FLV(FLash Video)格式是一样的,把接收下来的数据直接存入文件就可以了。但是这些视音频数据没有文件头,是纯视音频数据,因此需要在其前面加上FLV格式的文件头,这样得到的数据存成文件后才能被一般的视频播放器所播放。FLV格式的文件头是13个字节,如代码中所示。


RTMP_Read()中实际读取数据的函数是Read_1_Packet(),它的功能是从网络上读取一个RTMPPacket的数据,来看看它的源代码吧:

/* 从流媒体中读取多媒体packet。* Returns -3 if Play.Close/Stop, -2 if fatal error, -1 if no more media* packets, 0 if ignorable error, >0 if there is a media packet*/ static int Read_1_Packet(RTMP *r, char *buf, unsigned int buflen) {uint32_t prevTagSize = 0;int rtnGetNextMediaPacket = 0, ret = RTMP_READ_EOF;RTMPPacket packet = { 0 };int recopy = FALSE;unsigned int size;char *ptr, *pend;uint32_t nTimeStamp = 0;unsigned int len;//获取下一个packetrtnGetNextMediaPacket = RTMP_GetNextMediaPacket(r, &packet);while (rtnGetNextMediaPacket){char *packetBody = packet.m_body;unsigned int nPacketLen = packet.m_nBodySize;/* Return -3 if this was completed nicely with invoke message* Play.Stop or Play.Complete*/if (rtnGetNextMediaPacket == 2){RTMP_Log(RTMP_LOGDEBUG,"Got Play.Complete or Play.Stop from server. ""Assuming stream is complete");ret = RTMP_READ_COMPLETE;break;}//设置dataTyper->m_read.dataType |= (((packet.m_packetType == 0x08) << 2) |(packet.m_packetType == 0x09));//MessageID为9时,为视频数据,数据太小时。。。if (packet.m_packetType == 0x09 && nPacketLen <= 5){RTMP_Log(RTMP_LOGDEBUG, "ignoring too small video packet: size: %d",nPacketLen);ret = RTMP_READ_IGNORE;break;}//MessageID为8时,为音频数据,数据太小时。。。if (packet.m_packetType == 0x08 && nPacketLen <= 1){RTMP_Log(RTMP_LOGDEBUG, "ignoring too small audio packet: size: %d",nPacketLen);ret = RTMP_READ_IGNORE;break;}if (r->m_read.flags & RTMP_READ_SEEKING){ret = RTMP_READ_IGNORE;break;} #ifdef _DEBUGRTMP_Log(RTMP_LOGDEBUG, "type: %02X, size: %d, TS: %d ms, abs TS: %d",packet.m_packetType, nPacketLen, packet.m_nTimeStamp,packet.m_hasAbsTimestamp);if (packet.m_packetType == 0x09)RTMP_Log(RTMP_LOGDEBUG, "frametype: %02X", (*packetBody & 0xf0)); #endifif (r->m_read.flags & RTMP_READ_RESUME){/* check the header if we get one *///此类packet的timestamp都是0if (packet.m_nTimeStamp == 0){//messageID=18,数据消息(AMF0)if (r->m_read.nMetaHeaderSize > 0&& packet.m_packetType == 0x12){//获取metadataAMFObject metaObj;int nRes =AMF_Decode(&metaObj, packetBody, nPacketLen, FALSE);if (nRes >= 0){AVal metastring;AMFProp_GetString(AMF_GetProp(&metaObj, NULL, 0),&metastring);if (AVMATCH(&metastring, &av_onMetaData)){/* compare */if ((r->m_read.nMetaHeaderSize != nPacketLen) ||(memcmp(r->m_read.metaHeader, packetBody,r->m_read.nMetaHeaderSize) != 0)){ret = RTMP_READ_ERROR;}}AMF_Reset(&metaObj);if (ret == RTMP_READ_ERROR)break;}}/* check first keyframe to make sure we got the right position* in the stream! (the first non ignored frame)*/if (r->m_read.nInitialFrameSize > 0){/* video or audio data */if (packet.m_packetType == r->m_read.initialFrameType&& r->m_read.nInitialFrameSize == nPacketLen){/* we don't compare the sizes since the packet can* contain several FLV packets, just make sure the* first frame is our keyframe (which we are going* to rewrite)*/if (memcmp(r->m_read.initialFrame, packetBody,r->m_read.nInitialFrameSize) == 0){RTMP_Log(RTMP_LOGDEBUG, "Checked keyframe successfully!");r->m_read.flags |= RTMP_READ_GOTKF;/* ignore it! (what about audio data after it? it is* handled by ignoring all 0ms frames, see below)*/ret = RTMP_READ_IGNORE;break;}}/* hande FLV streams, even though the server resends the* keyframe as an extra video packet it is also included* in the first FLV stream chunk and we have to compare* it and filter it out !!*///MessageID=22,聚合消息if (packet.m_packetType == 0x16){/* basically we have to find the keyframe with the* correct TS being nResumeTS*/unsigned int pos = 0;uint32_t ts = 0;while (pos + 11 < nPacketLen){/* size without header (11) and prevTagSize (4) */uint32_t dataSize =AMF_DecodeInt24(packetBody + pos + 1);ts = AMF_DecodeInt24(packetBody + pos + 4);ts |= (packetBody[pos + 7] << 24);#ifdef _DEBUGRTMP_Log(RTMP_LOGDEBUG,"keyframe search: FLV Packet: type %02X, dataSize: %d, timeStamp: %d ms",packetBody[pos], dataSize, ts); #endif/* ok, is it a keyframe?:* well doesn't work for audio!*/if (packetBody[pos /*6928, test 0 */ ] ==r->m_read.initialFrameType/* && (packetBody[11]&0xf0) == 0x10 */ ){if (ts == r->m_read.nResumeTS){RTMP_Log(RTMP_LOGDEBUG,"Found keyframe with resume-keyframe timestamp!");if (r->m_read.nInitialFrameSize != dataSize|| memcmp(r->m_read.initialFrame,packetBody + pos + 11,r->m_read.nInitialFrameSize) != 0){RTMP_Log(RTMP_LOGERROR,"FLV Stream: Keyframe doesn't match!");ret = RTMP_READ_ERROR;break;}r->m_read.flags |= RTMP_READ_GOTFLVK;/* skip this packet?* check whether skippable:*/if (pos + 11 + dataSize + 4 > nPacketLen){RTMP_Log(RTMP_LOGWARNING,"Non skipable packet since it doesn't end with chunk, stream corrupt!");ret = RTMP_READ_ERROR;break;}packetBody += (pos + 11 + dataSize + 4);nPacketLen -= (pos + 11 + dataSize + 4);goto stopKeyframeSearch;}else if (r->m_read.nResumeTS < ts){/* the timestamp ts will only increase with* further packets, wait for seek*/goto stopKeyframeSearch;}}pos += (11 + dataSize + 4);}if (ts < r->m_read.nResumeTS){RTMP_Log(RTMP_LOGERROR,"First packet does not contain keyframe, all ""timestamps are smaller than the keyframe ""timestamp; probably the resume seek failed?");}stopKeyframeSearch:;if (!(r->m_read.flags & RTMP_READ_GOTFLVK)){RTMP_Log(RTMP_LOGERROR,"Couldn't find the seeked keyframe in this chunk!");ret = RTMP_READ_IGNORE;break;}}}}if (packet.m_nTimeStamp > 0&& (r->m_read.flags & (RTMP_READ_GOTKF|RTMP_READ_GOTFLVK))){/* another problem is that the server can actually change from* 09/08 video/audio packets to an FLV stream or vice versa and* our keyframe check will prevent us from going along with the* new stream if we resumed.** in this case set the 'found keyframe' variables to true.* We assume that if we found one keyframe somewhere and were* already beyond TS > 0 we have written data to the output* which means we can accept all forthcoming data including the* change between 08/09 <-> FLV packets*/r->m_read.flags |= (RTMP_READ_GOTKF|RTMP_READ_GOTFLVK);}/* skip till we find our keyframe* (seeking might put us somewhere before it)*/if (!(r->m_read.flags & RTMP_READ_GOTKF) &&packet.m_packetType != 0x16){RTMP_Log(RTMP_LOGWARNING,"Stream does not start with requested frame, ignoring data... ");r->m_read.nIgnoredFrameCounter++;if (r->m_read.nIgnoredFrameCounter > MAX_IGNORED_FRAMES)ret = RTMP_READ_ERROR; /* fatal error, couldn't continue stream */elseret = RTMP_READ_IGNORE;break;}/* ok, do the same for FLV streams */if (!(r->m_read.flags & RTMP_READ_GOTFLVK) &&packet.m_packetType == 0x16){RTMP_Log(RTMP_LOGWARNING,"Stream does not start with requested FLV frame, ignoring data... ");r->m_read.nIgnoredFlvFrameCounter++;if (r->m_read.nIgnoredFlvFrameCounter > MAX_IGNORED_FRAMES)ret = RTMP_READ_ERROR;elseret = RTMP_READ_IGNORE;break;}/* we have to ignore the 0ms frames since these are the first* keyframes; we've got these so don't mess around with multiple* copies sent by the server to us! (if the keyframe is found at a* later position there is only one copy and it will be ignored by* the preceding if clause)*/if (!(r->m_read.flags & RTMP_READ_NO_IGNORE) &&packet.m_packetType != 0x16){ /* exclude type 0x16 (FLV) since it can* contain several FLV packets */if (packet.m_nTimeStamp == 0){ret = RTMP_READ_IGNORE;break;}else{/* stop ignoring packets */r->m_read.flags |= RTMP_READ_NO_IGNORE;}}}/* calculate packet size and allocate slop buffer if necessary */size = nPacketLen +((packet.m_packetType == 0x08 || packet.m_packetType == 0x09|| packet.m_packetType == 0x12) ? 11 : 0) +(packet.m_packetType != 0x16 ? 4 : 0);if (size + 4 > buflen){/* the extra 4 is for the case of an FLV stream without a last* prevTagSize (we need extra 4 bytes to append it) */r->m_read.buf = (char *) malloc(size + 4);if (r->m_read.buf == 0){RTMP_Log(RTMP_LOGERROR, "Couldn't allocate memory!");ret = RTMP_READ_ERROR; /* fatal error */break;}recopy = TRUE;ptr = r->m_read.buf;}else{ptr = buf;}pend = ptr + size + 4;/* use to return timestamp of last processed packet *//* audio (0x08), video (0x09) or metadata (0x12) packets :* construct 11 byte header then add rtmp packet's data */if (packet.m_packetType == 0x08 || packet.m_packetType == 0x09|| packet.m_packetType == 0x12){nTimeStamp = r->m_read.nResumeTS + packet.m_nTimeStamp;prevTagSize = 11 + nPacketLen;*ptr = packet.m_packetType;ptr++;ptr = AMF_EncodeInt24(ptr, pend, nPacketLen);#if 0if(packet.m_packetType == 0x09) { /* video *//* H264 fix: */if((packetBody[0] & 0x0f) == 7) { /* CodecId = H264 */uint8_t packetType = *(packetBody+1);uint32_t ts = AMF_DecodeInt24(packetBody+2); /* composition time */int32_t cts = (ts+0xff800000)^0xff800000;RTMP_Log(RTMP_LOGDEBUG, "cts  : %d\n", cts);nTimeStamp -= cts;/* get rid of the composition time */CRTMP::EncodeInt24(packetBody+2, 0);}RTMP_Log(RTMP_LOGDEBUG, "VIDEO: nTimeStamp: 0x%08X (%d)\n", nTimeStamp, nTimeStamp);} #endifptr = AMF_EncodeInt24(ptr, pend, nTimeStamp);*ptr = (char)((nTimeStamp & 0xFF000000) >> 24);ptr++;/* stream id */ptr = AMF_EncodeInt24(ptr, pend, 0);}memcpy(ptr, packetBody, nPacketLen);len = nPacketLen;/* correct tagSize and obtain timestamp if we have an FLV stream */if (packet.m_packetType == 0x16){unsigned int pos = 0;int delta;/* grab first timestamp and see if it needs fixing */ //   nTimeStamp = AMF_DecodeInt24(packetBody + 4);//  nTimeStamp |= (packetBody[7] << 24); //   delta = packet.m_nTimeStamp - nTimeStamp;while (pos + 11 < nPacketLen){/* size without header (11) and without prevTagSize (4) */uint32_t dataSize = AMF_DecodeInt24(packetBody + pos + 1);nTimeStamp = AMF_DecodeInt24(packetBody + pos + 4);nTimeStamp |= (packetBody[pos + 7] << 24);//       if (delta) // { //   nTimeStamp += delta; //   AMF_EncodeInt24(ptr+pos+4, pend, nTimeStamp); //   ptr[pos+7] = nTimeStamp>>24; // }/* set data type */r->m_read.dataType |= (((*(packetBody + pos) == 0x08) << 2) |(*(packetBody + pos) == 0x09));if (pos + 11 + dataSize + 4 > nPacketLen){if (pos + 11 + dataSize > nPacketLen){RTMP_Log(RTMP_LOGERROR,"Wrong data size (%lu), stream corrupted, aborting!",dataSize);ret = RTMP_READ_ERROR;break;}RTMP_Log(RTMP_LOGWARNING, "No tagSize found, appending!");/* we have to append a last tagSize! */prevTagSize = dataSize + 11;AMF_EncodeInt32(ptr + pos + 11 + dataSize, pend,prevTagSize);size += 4;len += 4;}else{prevTagSize =AMF_DecodeInt32(packetBody + pos + 11 + dataSize);#ifdef _DEBUGRTMP_Log(RTMP_LOGDEBUG,"FLV Packet: type %02X, dataSize: %lu, tagSize: %lu, timeStamp: %lu ms",(unsigned char)packetBody[pos], dataSize, prevTagSize,nTimeStamp); #endifif (prevTagSize != (dataSize + 11)){ #ifdef _DEBUGRTMP_Log(RTMP_LOGWARNING,"Tag and data size are not consitent, writing tag size according to dataSize+11: %d",dataSize + 11); #endifprevTagSize = dataSize + 11;AMF_EncodeInt32(ptr + pos + 11 + dataSize, pend,prevTagSize);}}pos += prevTagSize + 4; /*(11+dataSize+4); */}}ptr += len;if (packet.m_packetType != 0x16){/* FLV tag packets contain their own prevTagSize */AMF_EncodeInt32(ptr, pend, prevTagSize);}/* In non-live this nTimeStamp can contain an absolute TS.* Update ext timestamp with this absolute offset in non-live mode* otherwise report the relative one*//* RTMP_Log(RTMP_LOGDEBUG, "type: %02X, size: %d, pktTS: %dms, TS: %dms, bLiveStream: %d", packet.m_packetType, nPacketLen, packet.m_nTimeStamp, nTimeStamp, r->Link.lFlags & RTMP_LF_LIVE); */r->m_read.timestamp = (r->Link.lFlags & RTMP_LF_LIVE) ? packet.m_nTimeStamp : nTimeStamp;ret = size;break;}if (rtnGetNextMediaPacket)RTMPPacket_Free(&packet);if (recopy){len = ret > buflen ? buflen : ret;memcpy(buf, r->m_read.buf, len);r->m_read.bufpos = r->m_read.buf + len;r->m_read.buflen = ret - len;}return ret; }


函数功能很多,重要的地方已经加上了注释,在此不再细分析。Read_1_Packet()里面实现从网络中读取视音频数据的函数是RTMP_GetNextMediaPacket()。下面我们来看看该函数的源代码:

int RTMP_GetNextMediaPacket(RTMP *r, RTMPPacket *packet) {int bHasMediaPacket = 0;while (!bHasMediaPacket && RTMP_IsConnected(r)&& RTMP_ReadPacket(r, packet)){if (!RTMPPacket_IsReady(packet)){continue;}bHasMediaPacket = RTMP_ClientPacket(r, packet);if (!bHasMediaPacket){RTMPPacket_Free(packet);}else if (r->m_pausing == 3){if (packet->m_nTimeStamp <= r->m_mediaStamp){bHasMediaPacket = 0; #ifdef _DEBUGRTMP_Log(RTMP_LOGDEBUG,"Skipped type: %02X, size: %d, TS: %d ms, abs TS: %d, pause: %d ms",packet->m_packetType, packet->m_nBodySize,packet->m_nTimeStamp, packet->m_hasAbsTimestamp,r->m_mediaStamp); #endifcontinue;}r->m_pausing = 0;}}if (bHasMediaPacket)r->m_bPlaying = TRUE;else if (r->m_sb.sb_timedout && !r->m_pausing)r->m_pauseStamp = r->m_channelTimestamp[r->m_mediaChannel];return bHasMediaPacket; }


这里有两个函数比较重要:RTMP_ReadPacket()以及RTMP_ClientPacket()。这两个函数中,前一个函数负责从网络上读取数据,后一个负责处理数据。这部分与建立RTMP连接的网络流(NetStream)的时候很相似,参考: RTMPdump(libRTMP) 源代码分析 6: 建立一个流媒体连接 (NetStream部分 1)

RTMP_ClientPacket()在前文中已经做过分析,在此不再重复叙述。在这里重点分析一下RTMP_ReadPacket(),来看看它的源代码。

//读取收下来的Chunk int RTMP_ReadPacket(RTMP *r, RTMPPacket *packet) {//packet 存读取完后的的数据//Chunk Header最大值18uint8_t hbuf[RTMP_MAX_HEADER_SIZE] = { 0 };//header 指向的是从Socket中收下来的数据char *header = (char *)hbuf;int nSize, hSize, nToRead, nChunk;int didAlloc = FALSE;RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d", __FUNCTION__, r->m_sb.sb_socket);//收下来的数据存入hbufif (ReadN(r, (char *)hbuf, 1) == 0){RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header", __FUNCTION__);return FALSE;}//块类型fmtpacket->m_headerType = (hbuf[0] & 0xc0) >> 6;//块流ID(2-63)packet->m_nChannel = (hbuf[0] & 0x3f);header++;//块流ID第1字节为0时,块流ID占2个字节if (packet->m_nChannel == 0){if (ReadN(r, (char *)&hbuf[1], 1) != 1){RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 2nd byte",__FUNCTION__);return FALSE;}//计算块流ID(64-319)packet->m_nChannel = hbuf[1];packet->m_nChannel += 64;header++;}//块流ID第1字节为0时,块流ID占3个字节else if (packet->m_nChannel == 1){int tmp;if (ReadN(r, (char *)&hbuf[1], 2) != 2){RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 3nd byte",__FUNCTION__);return FALSE;}tmp = (hbuf[2] << 8) + hbuf[1];//计算块流ID(64-65599)packet->m_nChannel = tmp + 64;RTMP_Log(RTMP_LOGDEBUG, "%s, m_nChannel: %0x", __FUNCTION__, packet->m_nChannel);header += 2;}//ChunkHeader的大小(4种)nSize = packetSize[packet->m_headerType];if (nSize == RTMP_LARGE_HEADER_SIZE) /* if we get a full header the timestamp is absolute */packet->m_hasAbsTimestamp = TRUE; //11字节的完整ChunkMsgHeader的TimeStamp是绝对值else if (nSize < RTMP_LARGE_HEADER_SIZE){ /* using values from the last message of this channel */if (r->m_vecChannelsIn[packet->m_nChannel])memcpy(packet, r->m_vecChannelsIn[packet->m_nChannel],sizeof(RTMPPacket));}nSize--;if (nSize > 0 && ReadN(r, header, nSize) != nSize){RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header. type: %x",__FUNCTION__, (unsigned int)hbuf[0]);return FALSE;}hSize = nSize + (header - (char *)hbuf);if (nSize >= 3){//TimeStamp(注意 BigEndian to SmallEndian)(11,7,3字节首部都有)packet->m_nTimeStamp = AMF_DecodeInt24(header);/*RTMP_Log(RTMP_LOGDEBUG, "%s, reading RTMP packet chunk on channel %x, headersz %i, timestamp %i, abs timestamp %i", __FUNCTION__, packet.m_nChannel, nSize, packet.m_nTimeStamp, packet.m_hasAbsTimestamp); *///消息长度(11,7字节首部都有)if (nSize >= 6){packet->m_nBodySize = AMF_DecodeInt24(header + 3);packet->m_nBytesRead = 0;RTMPPacket_Free(packet);//(11,7字节首部都有)if (nSize > 6){//Msg type IDpacket->m_packetType = header[6];//Msg Stream IDif (nSize == 11)packet->m_nInfoField2 = DecodeInt32LE(header + 7);}}//Extend TimeStampif (packet->m_nTimeStamp == 0xffffff){if (ReadN(r, header + nSize, 4) != 4){RTMP_Log(RTMP_LOGERROR, "%s, failed to read extended timestamp",__FUNCTION__);return FALSE;}packet->m_nTimeStamp = AMF_DecodeInt32(header + nSize);hSize += 4;}}RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)hbuf, hSize);if (packet->m_nBodySize > 0 && packet->m_body == NULL){if (!RTMPPacket_Alloc(packet, packet->m_nBodySize)){RTMP_Log(RTMP_LOGDEBUG, "%s, failed to allocate packet", __FUNCTION__);return FALSE;}didAlloc = TRUE;packet->m_headerType = (hbuf[0] & 0xc0) >> 6;}nToRead = packet->m_nBodySize - packet->m_nBytesRead;nChunk = r->m_inChunkSize;if (nToRead < nChunk)nChunk = nToRead;/* Does the caller want the raw chunk? */if (packet->m_chunk){packet->m_chunk->c_headerSize = hSize;memcpy(packet->m_chunk->c_header, hbuf, hSize);packet->m_chunk->c_chunk = packet->m_body + packet->m_nBytesRead;packet->m_chunk->c_chunkSize = nChunk;}if (ReadN(r, packet->m_body + packet->m_nBytesRead, nChunk) != nChunk){RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet body. len: %lu",__FUNCTION__, packet->m_nBodySize);return FALSE;}RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)packet->m_body + packet->m_nBytesRead, nChunk);packet->m_nBytesRead += nChunk;/* keep the packet as ref for other packets on this channel */if (!r->m_vecChannelsIn[packet->m_nChannel])r->m_vecChannelsIn[packet->m_nChannel] = (RTMPPacket *) malloc(sizeof(RTMPPacket));memcpy(r->m_vecChannelsIn[packet->m_nChannel], packet, sizeof(RTMPPacket));//读取完毕if (RTMPPacket_IsReady(packet)){/* make packet's timestamp absolute */if (!packet->m_hasAbsTimestamp)packet->m_nTimeStamp += r->m_channelTimestamp[packet->m_nChannel]; /* timestamps seem to be always relative!! */r->m_channelTimestamp[packet->m_nChannel] = packet->m_nTimeStamp;/* reset the data from the stored packet. we keep the header since we may use it later if a new packet for this channel *//* arrives and requests to re-use some info (small packet header) */r->m_vecChannelsIn[packet->m_nChannel]->m_body = NULL;r->m_vecChannelsIn[packet->m_nChannel]->m_nBytesRead = 0;r->m_vecChannelsIn[packet->m_nChannel]->m_hasAbsTimestamp = FALSE; /* can only be false if we reuse header */}else{packet->m_body = NULL; /* so it won't be erased on free */}return TRUE; }


函数代码看似很多,但是并不是很复杂,可以理解为在从事“简单重复性劳动”(和搬砖差不多)。基本上是一个字节一个字节的读取,然后按照RTMP协议规范进行解析。具体如何解析可以参考RTMP协议规范。

在RTMP_ReadPacket()函数里完成从Socket中读取数据的函数是ReadN(),继续看看它的源代码:

//从HTTP或SOCKET中读取数据 static int ReadN(RTMP *r, char *buffer, int n) {int nOriginalSize = n;int avail;char *ptr;r->m_sb.sb_timedout = FALSE;#ifdef _DEBUGmemset(buffer, 0, n); #endifptr = buffer;while (n > 0){int nBytes = 0, nRead;if (r->Link.protocol & RTMP_FEATURE_HTTP){while (!r->m_resplen){if (r->m_sb.sb_size < 144){if (!r->m_unackd)HTTP_Post(r, RTMPT_IDLE, "", 1);if (RTMPSockBuf_Fill(&r->m_sb) < 1){if (!r->m_sb.sb_timedout)RTMP_Close(r);return 0;}}HTTP_read(r, 0);}if (r->m_resplen && !r->m_sb.sb_size)RTMPSockBuf_Fill(&r->m_sb);avail = r->m_sb.sb_size;if (avail > r->m_resplen)avail = r->m_resplen;}else{avail = r->m_sb.sb_size;if (avail == 0){if (RTMPSockBuf_Fill(&r->m_sb) < 1){if (!r->m_sb.sb_timedout)RTMP_Close(r);return 0;}avail = r->m_sb.sb_size;}}nRead = ((n < avail) ? n : avail);if (nRead > 0){memcpy(ptr, r->m_sb.sb_start, nRead);r->m_sb.sb_start += nRead;r->m_sb.sb_size -= nRead;nBytes = nRead;r->m_nBytesIn += nRead;if (r->m_bSendCounter&& r->m_nBytesIn > r->m_nBytesInSent + r->m_nClientBW / 2)SendBytesReceived(r);}/*RTMP_Log(RTMP_LOGDEBUG, "%s: %d bytes\n", __FUNCTION__, nBytes); */ #ifdef _DEBUGfwrite(ptr, 1, nBytes, netstackdump_read); #endifif (nBytes == 0){RTMP_Log(RTMP_LOGDEBUG, "%s, RTMP socket closed by peer", __FUNCTION__);/*goto again; */RTMP_Close(r);break;}if (r->Link.protocol & RTMP_FEATURE_HTTP)r->m_resplen -= nBytes;#ifdef CRYPTOif (r->Link.rc4keyIn){RC4_encrypt((RC4_KEY *)r->Link.rc4keyIn, nBytes, ptr);} #endifn -= nBytes;ptr += nBytes;}return nOriginalSize - n; }


ReadN()中实现从Socket中接收数据的函数是RTMPSockBuf_Fill(),看看代码吧(又是层层调用)。

//调用Socket编程中的recv()函数,接收数据 int RTMPSockBuf_Fill(RTMPSockBuf *sb) {int nBytes;if (!sb->sb_size)sb->sb_start = sb->sb_buf;while (1){//缓冲区长度:总长-未处理字节-已处理字节//|-----已处理--------|-----未处理--------|---------缓冲区----------|//sb_buf        sb_start    sb_size     nBytes = sizeof(sb->sb_buf) - sb->sb_size - (sb->sb_start - sb->sb_buf); #if defined(CRYPTO) && !defined(NO_SSL)if (sb->sb_ssl){nBytes = TLS_read((SSL *)sb->sb_ssl, sb->sb_start + sb->sb_size, nBytes);}else #endif{//int recv( SOCKET s, char * buf, int len, int flags);//s:一个标识已连接套接口的描述字。//buf:用于接收数据的缓冲区。 //len:缓冲区长度。//flags:指定调用方式。//从sb_start(待处理的下一字节) + sb_size()还未处理的字节开始buffer为空,可以存储nBytes = recv(sb->sb_socket, sb->sb_start + sb->sb_size, nBytes, 0);}if (nBytes != -1){//未处理的字节又多了sb->sb_size += nBytes;}else{int sockerr = GetSockError();RTMP_Log(RTMP_LOGDEBUG, "%s, recv returned %d. GetSockError(): %d (%s)",__FUNCTION__, nBytes, sockerr, strerror(sockerr));if (sockerr == EINTR && !RTMP_ctrlC)continue;if (sockerr == EWOULDBLOCK || sockerr == EAGAIN){sb->sb_timedout = TRUE;nBytes = 0;}}break;}return nBytes; }


从RTMPSockBuf_Fill()代码中可以看出,调用了系统Socket的recv()函数接收RTMP连接传输过来的数据。


rtmpdump源代码(Linux):http://download.csdn.net/detail/leixiaohua1020/6376561

rtmpdump源代码(VC 2005 工程):http://download.csdn.net/detail/leixiaohua1020/6563163




转载于:https://my.oschina.net/leixiaohua1020/blog/302008

总结

以上是生活随笔为你收集整理的RTMPdump(libRTMP) 源代码分析 9: 接收消息(Message)(接收视音频数据)的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。