通过前面的文章我们可以了解到,当创建好Transport的时候,socket已经建立好。具备了相应的网络传输能力。我们来看一下socket接收到数据是如何处理的。
UdpSocketHandler::OnUvRecv
Socket接收数据
inline void UdpSocketHandler::OnUvRecv(
ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned int flags)
{
MS_TRACE();
// NOTE: Ignore if there is nothing to read or if it was an empty datagram.
if (nread == 0)
return;
// Check flags.
if ((flags & UV_UDP_PARTIAL) != 0u)
{
MS_ERROR("received datagram was truncated due to insufficient buffer, ignoring it");
return;
}
// Data received.
if (nread > 0)
{
// Update received bytes.更新接收字节。
this->recvBytes += nread;
// Notify the subclass.通知子类。UdpSocket 是其子类
UserOnUdpDatagramReceived(reinterpret_cast<uint8_t*>(buf->base), nread, addr);
}
// Some error.
else
{
MS_DEBUG_DEV("read error: %s", uv_strerror(nread));
}
}
UserOnUdpDatagramReceived
具体由UdpSocket其子类实现,其中listener是在创建transport创建时的具体transport
void UdpSocket::UserOnUdpDatagramReceived(const uint8_t* data, size_t len, const struct sockaddr* addr)
{
MS_TRACE();
if (!this->listener)
{
MS_ERROR("no listener set");
return;
}
// Notify the reader.通知读者。
this->listener->OnUdpSocketPacketReceived(this, data, len, addr);
}
OnUdpSocketPacketReceived
以PlainTransport为例
//从udpsocket获得了接收数据
inline void PlainTransport::OnUdpSocketPacketReceived(
RTC::UdpSocket* socket, const uint8_t* data, size_t len, const struct sockaddr* remoteAddr)
{
MS_TRACE();
//形成元组,记录IP等内容
RTC::TransportTuple tuple(socket, remoteAddr);
//进入到当前transport处理
OnPacketReceived(&tuple, data, len);
}
PlainTransport::OnPacketReceived
inline void PlainTransport::OnPacketReceived(RTC::TransportTuple* tuple, const uint8_t* data, size_t len)
{
MS_TRACE();
// Increase receive transmission.增加接收传输。
RTC::Transport::DataReceived(len);
// Check if it's RTCP.检查它是否是RTCP。
if (RTC::RTCP::Packet::IsRtcp(data, len))
{
OnRtcpDataReceived(tuple, data, len);
}
// Check if it's RTP.检查它是否是RTP。
else if (RTC::RtpPacket::IsRtp(data, len))
{
OnRtpDataReceived(tuple, data, len);
}
// Check if it's SCTP.检查它是否是SCTP。
else if (RTC::SctpAssociation::IsSctp(data, len))
{
OnSctpDataReceived(tuple, data, len);
}
else
{
MS_WARN_DEV("ignoring received packet of unknown type");
}
}
RTP数据处理方式
首先来处理是不是加密的RTP数据;然后根据既定格式重构RTP数据为Packet;最后透传整理好的packet到上层Transport
inline void PlainTransport::OnRtpDataReceived(
RTC::TransportTuple* tuple, const uint8_t* data, size_t len)
{
MS_TRACE();
if (HasSrtp() && !IsSrtpReady())
return;
// Decrypt the SRTP packet.解密SRTP报文。
auto intLen = static_cast<int>(len);
if (HasSrtp() && !this->srtpRecvSession->DecryptSrtp(const_cast<uint8_t*>(data), &intLen))
{
RTC::RtpPacket* packet = RTC::RtpPacket::Parse(data, static_cast<size_t>(intLen));
if (!packet)
{
MS_WARN_TAG(srtp, "DecryptSrtp() failed due to an invalid RTP packet");
}
else
{
MS_WARN_TAG(
srtp,
"DecryptSrtp() failed [ssrc:%" PRIu32 ", payloadType:%" PRIu8 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetPayloadType(),
packet->GetSequenceNumber());
delete packet;
}
return;
}
//解析socket数据,获取格式化后的RtpPacket
RTC::RtpPacket* packet = RTC::RtpPacket::Parse(data, static_cast<size_t>(intLen));
if (!packet)
{
MS_WARN_TAG(rtp, "received data is not a valid RTP packet");
return;
}
// If we don't have a RTP tuple yet, check whether comedia mode is set.
if (!this->tuple)
{
if (!this->comedia)
{
MS_DEBUG_TAG(rtp, "ignoring RTP packet while not connected");
// Remove this SSRC.
RecvStreamClosed(packet->GetSsrc());
delete packet;
return;
}
MS_DEBUG_TAG(rtp, "setting RTP tuple (comedia mode enabled)");
auto wasConnected = IsConnected();
this->tuple = new RTC::TransportTuple(tuple);
if (!this->listenIp.announcedIp.empty())
this->tuple->SetLocalAnnouncedIp(this->listenIp.announcedIp);
// If not yet connected do it now.
if (!wasConnected)
{
// Notify the Node PlainTransport.
json data = json::object();
this->tuple->FillJson(data["tuple"]);
this->shared->channelNotifier->Emit(this->id, "tuple", data);
RTC::Transport::Connected();
}
}
// Otherwise, if RTP tuple is set, verify that it matches the origin
// of the packet.
else if (!this->tuple->Compare(tuple))
{
MS_DEBUG_TAG(rtp, "ignoring RTP packet from unknown IP:port");
// Remove this SSRC.
RecvStreamClosed(packet->GetSsrc());
delete packet;
return;
}
// Pass the packet to the parent transport.将数据包传递给父传输。
RTC::Transport::ReceiveRtpPacket(packet);
}
Transport::ReceiveRtpPacket
//当前调用来源于子类的OnRtpDataReceived中触发了当前接口
void Transport::ReceiveRtpPacket(RTC::RtpPacket* packet)
{
MS_TRACE();
packet->logger.recvTransportId = this->id;
// Apply the Transport RTP header extension ids so the RTP listener can use them.
// 应用传输RTP报头扩展id,以便RTP侦听器可以使用它们。
packet->SetMidExtensionId(this->recvRtpHeaderExtensionIds.mid);
packet->SetRidExtensionId(this->recvRtpHeaderExtensionIds.rid);
packet->SetRepairedRidExtensionId(this->recvRtpHeaderExtensionIds.rrid);
packet->SetAbsSendTimeExtensionId(this->recvRtpHeaderExtensionIds.absSendTime);
packet->SetTransportWideCc01ExtensionId(this->recvRtpHeaderExtensionIds.transportWideCc01);
auto nowMs = DepLibUV::GetTimeMs();
// Feed the TransportCongestionControlServer.
if (this->tccServer)
{
this->tccServer->IncomingPacket(nowMs, packet);
}
// Get the associated Producer.
/*根据收到的packet,查找关联的producer。*/
RTC::Producer* producer = this->rtpListener.GetProducer(packet);
if (!producer)
{
packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::PRODUCER_NOT_FOUND);
MS_WARN_TAG(
rtp,
"no suitable Producer for received RTP packet [ssrc:%" PRIu32 ", payloadType:%" PRIu8 "]",
packet->GetSsrc(),
packet->GetPayloadType());
// Tell the child class to remove this SSRC.告诉子类删除这个SSRC。
RecvStreamClosed(packet->GetSsrc());
delete packet;
return;
}
// MS_DEBUG_DEV(
// "RTP packet received [ssrc:%" PRIu32 ", payloadType:%" PRIu8 ", producerId:%s]",
// packet->GetSsrc(),
// packet->GetPayloadType(),
// producer->id.c_str());
// Pass the RTP packet to the corresponding Producer.
/*将packet传给指定的producer,进行下一步处理。*/
auto result = producer->ReceiveRtpPacket(packet);
switch (result)/*根据packet包类型不同,进行不同通道的码率统计。*/
{
case RTC::Producer::ReceiveRtpPacketResult::MEDIA:
this->recvRtpTransmission.Update(packet);/*媒体通道的码率统计*/
break;
case RTC::Producer::ReceiveRtpPacketResult::RETRANSMISSION:
this->recvRtxTransmission.Update(packet); /*重传通道的码率统计*/
break;
case RTC::Producer::ReceiveRtpPacketResult::DISCARDED:
// Tell the child class to remove this SSRC.
RecvStreamClosed(packet->GetSsrc());
break;
default:;
}
/*释放rtp包*/
delete packet;
}
Producer::ReceiveRtpPacket
/*接收到transport传入的packet,对packet进行指定的处理。*/
Producer::ReceiveRtpPacketResult Producer::ReceiveRtpPacket(RTC::RtpPacket* packet)
{
MS_TRACE();
packet->logger.producerId = this->id;
// Reset current packet.
/*重置当前正在处理的packet*/
this->currentRtpPacket = nullptr;
// Count number of RTP streams.统计当前接收流的数目
auto numRtpStreamsBefore = this->mapSsrcRtpStream.size();
/*通过packet,获取对应的接收流。*/
auto* rtpStream = GetRtpStream(packet);
if (!rtpStream)/*没有查找到对应的rtp接收流*/
{
MS_WARN_TAG(rtp, "no stream found for received packet [ssrc:%" PRIu32 "]", packet->GetSsrc());
packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::RECV_RTP_STREAM_NOT_FOUND);
return ReceiveRtpPacketResult::DISCARDED;/*将packet丢弃*/
}
// Pre-process the packet.
/*对packet进行预处理:如果是视频,则添加头部扩展id。*/
PreProcessRtpPacket(packet);
ReceiveRtpPacketResult result;
bool isRtx{ false };/*packet是否是rtx流中的packet*/
// Media packet.
/*是主流中的rtp包*/
if (packet->GetSsrc() == rtpStream->GetSsrc())
{
/*设置返回结果,表示是媒体流,视频流或音频流。*/
result = ReceiveRtpPacketResult::MEDIA;
// Process the packet.
/*rtp接收流处理接收到的packet*/
if (!rtpStream->ReceivePacket(packet))
{
// May have to announce a new RTP stream to the listener.
/*如果添加了新的rtp接收流,则通知其订阅者。*/
if (this->mapSsrcRtpStream.size() > numRtpStreamsBefore)
NotifyNewRtpStream(rtpStream); /*最终通知到的是与producer相关的consumer*/
packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::RECV_RTP_STREAM_DISCARDED);
return result;
}
}
// RTX packet.
/*重传流中的rtp包*/
else if (packet->GetSsrc() == rtpStream->GetRtxSsrc())
{
result = ReceiveRtpPacketResult::RETRANSMISSION;
isRtx = true;
// Process the packet.
/*rtp接收流处理重传流中的packet*/
if (!rtpStream->ReceiveRtxPacket(packet))
{
packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::RECV_RTP_STREAM_NOT_FOUND);
return result;
}
}
// Should not happen.
else
{
MS_ABORT("found stream does not match received packet");
}
/*判断packet是否是关键帧中的包*/
if (packet->IsKeyFrame())
{
MS_DEBUG_TAG(
rtp,
"key frame received [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber());
// Tell the keyFrameRequestManager.
if (this->keyFrameRequestManager)
this->keyFrameRequestManager->KeyFrameReceived(packet->GetSsrc()); /*更新关键帧*/
}
// May have to announce a new RTP stream to the listener.
if (this->mapSsrcRtpStream.size() > numRtpStreamsBefore)
{
// Request a key frame for this stream since we may have lost the first packets
// (do not do it if this is a key frame).
if (this->keyFrameRequestManager && !this->paused && !packet->IsKeyFrame())
this->keyFrameRequestManager->ForceKeyFrameNeeded(packet->GetSsrc());
// Update current packet.
this->currentRtpPacket = packet;
NotifyNewRtpStream(rtpStream);
// Reset current packet.
this->currentRtpPacket = nullptr;
}
// If paused stop here.
if (this->paused)
return result;
// May emit 'trace' event.
EmitTraceEventRtpAndKeyFrameTypes(packet, isRtx);
// Mangle the packet before providing the listener with it.
/*在将packet发布至其订阅者之前,对其进行倾轧。
主要进行payload type,ssrc,header extension的处理*/
if (!MangleRtpPacket(packet, rtpStream))
return ReceiveRtpPacketResult::DISCARDED;
// Post-process the packet.
/*最后再对packet进行一次处理*/
PostProcessRtpPacket(packet);
/*将处理后的packet,发送到其订阅者transport中。*/
this->listener->OnProducerRtpPacketReceived(this, packet);
return result;
}
向上传递到Transport层
inline void Transport::OnProducerRtpPacketReceived(RTC::Producer* producer, RTC::RtpPacket* packet)
{
MS_TRACE();
//listener是上层的Router
this->listener->OnTransportProducerRtpPacketReceived(this, producer, packet);
}
向上传递到Router层
inline void Router::OnTransportProducerRtpPacketReceived(
RTC::Transport* /*transport*/, RTC::Producer* producer, RTC::RtpPacket* packet)
{
MS_TRACE();
packet->logger.routerId = this->id;
//通过生产者,所以出订阅者列表
auto& consumers = this->mapProducerConsumers.at(producer);
//如果存在对应的订阅者
if (!consumers.empty())
{
// Cloned ref-counted packet that RtpStreamSend will store for as long as
// needed avoiding multiple allocations unless absolutely necessary.
// Clone only happens if needed.
std::shared_ptr<RTC::RtpPacket> sharedPacket;
for (auto* consumer : consumers)
{
// Update MID RTP extension value.
const auto& mid = consumer->GetRtpParameters().mid;
if (!mid.empty())
packet->UpdateMid(mid);
//发送RTP数据
consumer->SendRtpPacket(packet, sharedPacket);
}
}
auto it = this->mapProducerRtpObservers.find(producer);
if (it != this->mapProducerRtpObservers.end())
{
auto& rtpObservers = it->second;
for (auto* rtpObserver : rtpObservers)
{
rtpObserver->ReceiveRtpPacket(producer, packet);
}
}
}
具体transport通道转发数据
void PlainTransport::SendRtpPacket(
RTC::Consumer* /*consumer*/, RTC::RtpPacket* packet, RTC::Transport::onSendCallback* cb)
{
MS_TRACE();
if (!IsConnected())
{
if (cb)
{
(*cb)(false);
delete cb;
}
return;
}
const uint8_t* data = packet->GetData();
auto intLen = static_cast<int>(packet->GetSize());
if (HasSrtp() && !this->srtpSendSession->EncryptRtp(&data, &intLen))
{
if (cb)
{
(*cb)(false);
delete cb;
}
return;
}
auto len = static_cast<size_t>(intLen);
//使用元组获发送RTP数据
this->tuple->Send(data, len, cb);
// Increase send transmission.增加发送传输的数据大小。
RTC::Transport::DataSent(len);
}
void Send(const uint8_t* data, size_t len, RTC::TransportTuple::onSendCallback* cb = nullptr)
{
if (this->protocol == Protocol::UDP)
this->udpSocket->Send(data, len, this->udpRemoteAddr, cb);
else
this->tcpConnection->Send(data, len, cb);
}
底层实际发送
void UdpSocketHandler::Send(
const uint8_t* data, size_t len, const struct sockaddr* addr, UdpSocketHandler::onSendCallback* cb)
{
MS_TRACE();
if (this->closed)
{
if (cb)
{
(*cb)(false);
delete cb;
}
return;
}
if (len == 0)
{
if (cb)
{
(*cb)(false);
delete cb;
}
return;
}
// First try uv_udp_try_send(). In case it can not directly send the datagram
// then build a uv_req_t and use uv_udp_send().
uv_buf_t buffer = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data)), len);
const int sent = uv_udp_try_send(this->uvHandle, &buffer, 1, addr);
// Entire datagram was sent. Done.
if (sent == static_cast<int>(len))
{
// Update sent bytes.
this->sentBytes += sent;
if (cb)
{
(*cb)(true);
delete cb;
}
return;
}
else if (sent >= 0)
{
MS_WARN_DEV("datagram truncated (just %d of %zu bytes were sent)", sent, len);
// Update sent bytes.
this->sentBytes += sent;
if (cb)
{
(*cb)(false);
delete cb;
}
return;
}
// Any error but legit EAGAIN. Use uv_udp_send().
else if (sent != UV_EAGAIN)
{
MS_WARN_DEV("uv_udp_try_send() failed, trying uv_udp_send(): %s", uv_strerror(sent));
}
auto* sendData = new UvSendData(len);
sendData->req.data = static_cast<void*>(sendData);
std::memcpy(sendData->store, data, len);
sendData->cb = cb;
buffer = uv_buf_init(reinterpret_cast<char*>(sendData->store), len);
int err = uv_udp_send(
&sendData->req, this->uvHandle, &buffer, 1, addr, static_cast<uv_udp_send_cb>(onSend));
if (err != 0)
{
// NOTE: uv_udp_send() returns error if a wrong INET family is given
// (IPv6 destination on a IPv4 binded socket), so be ready.
MS_WARN_DEV("uv_udp_send() failed: %s", uv_strerror(err));
if (cb)
(*cb)(false);
// Delete the UvSendData struct (it will delete the store and cb too).
delete sendData;
}
else
{
// Update sent bytes.
this->sentBytes += len;
}
}