Pacer起到平滑碼率的作用,使發送到網絡上的碼率穩定。如下的這張創建Pacer的流程圖,其中PacerSender就是Pacer,其中PacerSender就是Pacer。這篇文章介紹它的核心子類PacingController及Periodic模式下平滑處理的基本流程。平滑處理流程中還有與帶寬探測所關聯的流程,在本篇文章中并不涉及。
從上圖中可以看到,在創建Call
對象時,會創建一個RtpTransportControllerSend
,它是Call對象中發送數據的大總管,而PacerSender
也是屬于它管理的對象。
一個Call
對象中一個RtpTransportControllerSend
,一個RtpTransportControllerSend
中一個PacerSender
,所以Pacer是作用于Call中所有的stream,這里并不是只處理音視頻包,還有fec包,重傳包,padding包,Call
對象中也發送出去的數據都會經過Pacer。
這篇文章是介紹平滑實現的基本原理和Pacer中的Periodic
模式的處理流程。Pacer的流程中還有與帶寬探測所關聯的流程,在本篇文章中并不涉及。
碼率平滑的原理
在視頻編碼中,雖然編碼器會將輸出碼流的碼率控制在所設置的碼率范圍內。但是在編碼器產生關鍵幀或在畫面變化比較大時,碼率可能超過設置的碼率值。在有fec或重傳包時,也可能造成實際發送的碼率值超過目標值。這種突發的大碼率的數據,可能就會造成網絡鏈路擁塞。
所以引入的pacer就是平滑發送的碼率值,在一段時間內,保證發送碼率接近設置目標碼率值。而避免突發的高碼率造成網絡鏈路擁塞。
平滑的基本原理就是**緩存隊列+周期發送,將要發送的數據先緩存,在周期性的發送出去,起到平均碼率的目的。那么這種周期有兩種模式:**
**kPeriodic**
,周期模式,也是默認模式,以固定間隔時間發送數據。kDynamic
,動態模式,根據數據的緩存時長及數據量來計算下一次發送數據的時間點。
組成
pacer的流程都實現在PacingController
,包括兩個核心類:RoundBoinPacketQueue
,IntervalBudget
。
RoundBobinPacketQueue
緩存隊列,對每條流都會緩存,以ssrc做為流的唯一標識,包括:重傳包,fec,padding包。IntervalBudget
根據設置的目標碼率值及時間間隔計算可發送的數據量。
PacingController類
所屬文件為\modules\pacing\pacing_controller.h
,如下類圖:
兩個核心的成員變量:
RoundRobinPakcetQueue packet_queue_
packet的緩存隊列。IntervalBudget media_buget_
可發送數據量計算。
兩個核心函數:
NextSendTime
,獲取每次執行的時間(5毫秒,在kPeriodic
模式下)。ProcessPackets
,周期處理包的發送,確定要發送的數據量,從緩存隊列中取包。
平滑邏輯的處理流程
整個pacer運行的機制就是靠PacingController
的NextSendTime
和ProcessPackets
兩個方法,它們被單獨的放在一個ModuleThread
線程中執行,周期性的被執行,兩個方法調用的堆棧如下:
**NextSendTime**
peerconnection_client.exe!webrtc::PacingController::NextSendTime() 行 348 C++
peerconnection_client.exe!webrtc::PacedSender::TimeUntilNextProcess() 行 171 C++
peerconnection_client.exe!webrtc::PacedSender::ModuleProxy::TimeUntilNextProcess() 行 150 C++
peerconnection_client.exe!webrtc::`anonymous namespace’::GetNextCallbackTime(webrtc::Module * module, __int64 time_now) 行 30 C++
peerconnection_client.exe!webrtc::ProcessThreadImpl::Process() 行 231 C++
peerconnection_client.exe!webrtc::ProcessThreadImpl::Run(void * obj) 行 198 C++
peerconnection_client.exe!rtc::PlatformThread::Run() 行 130 C++
peerconnection_client.exe!rtc::PlatformThread::StartThread(void * param) 行 62 C++
**ProcessPackets**
peerconnection_client.exe!webrtc::PacingController::ProcessPackets() 行 408 C++
peerconnection_client.exe!webrtc::PacedSender::Process() 行 183 C++
peerconnection_client.exe!webrtc::PacedSender::ModuleProxy::Process() 行 152 C++
peerconnection_client.exe!webrtc::ProcessThreadImpl::Process() 行 226 C++
peerconnection_client.exe!webrtc::ProcessThreadImpl::Run(void * obj) 行 198 C++
peerconnection_client.exe!rtc::PlatformThread::Run() 行 130 C++
peerconnection_client.exe!rtc::PlatformThread::StartThread(void * param) 行 62 C++
核心骨架就是下面三個步驟:
- 設置目標碼率,通過
SetPacingRates(...)
方法。 - 計算每個時間片可以發送的數據,在
UpdateBudgetWithElapsedTime(TimeDelta delta)
方法中。 - 用已發送的數據量來計算還剩多少數據量可以發送,在
UpdateBudgetWithSentData(DataSize size)
方法中。
詳細流程:
(1). 如果媒體數據包處理模式是 kDynamic,則檢查期望的發送時間和 前一次數據包處理時間 的對比,當前者大于后者時,則根據兩者的差值更新預計仍在傳輸中的數據量,以及 前一次數據包處理時間;(Periodic模式是5ms執行一次)
(2). 從媒體數據包優先級隊列中取一個數據包出來;
(3). 第 (2) 步中取出的數據包為空,但已經發送的媒體數據的量還沒有達到碼率探測器 webrtc::BitrateProber 建議發送的最小探測數據量,則創建一些填充數據包放入媒體數據包優先級隊列,并繼續下一輪處理;
(4). 發送取出的媒體數據包;
(5). 獲取 FEC 數據包,并放入媒體數據包優先級隊列;
(6). 根據發送的數據包的數據量,更新預計仍在傳輸中的數據量等信息;
(7). 如果是在碼率探測期間,且發送的數據量超出碼率探測器 webrtc::BitrateProber 建議發送的最小探測數據量,則結束發送過程;
(8). 如果媒體數據包處理模式是 kDynamic,則更新目標發送時間。
RoundBobinPacketQueue
RoundBobinPacketQueue
是一個緩存隊列, 用于緩存數據包(音視頻包,fec,padding,重傳包),它有兩個特征:
- 根據優先級存儲包(每種類型包都有優先級)。
- 記錄緩存時長(記錄每個包的入隊時間,用于計算緩存的總時長,避免引入過多的延遲)。
類圖
上圖種的Stream
類代表了一路流,QueuePacket
類代表了數據包。
RoundBobinPacketQueue
三個核心的數據結構:
std::map<uint32_t, Stream> streams_
key為ssrc。
std::multimap<StreamPrioKey, uint32_t> **stream_priorities_**
**Stream**
的優先級信息表,以**priority**
和**DataSize**
為比較的key,value是ssrc。通過優先級找Stream
,方便優先級變化的實現。越靠前,優先級越高。
Stream
類中的std::multimap<StreamPrioKey, uint32_t>::iterator priority_it;
它指向 RoundBobinPacketQueue
中的stream_priorities_
中的某項,可以快速定位到自己的優先級。
std::multiset<Timestamp> enqueue_times_
The enqueue time of every packet currently in the queue. Used to figure out the age of the oldest packet in the queue.
記錄每一個包的入隊時間
QueuedPacket
對象中的std::multiset<Timestamp>::iterator enqueue_time_it_;
指向enqueue_times_
中的項,可以快速定位到自己的入隊時間。
Stream
,QueuePacket
,RoundBobinPacketQueue
關系圖
如下是Stream
對象,QueuePacket
對象與RoundBobinPacketQueue
對象的關系圖。
上圖是以Stream
為中心,描繪Stream
,QueuePacket
,RoundBobinPacketQueue
的關系。
- 每個
Stream
都被記錄在RoundRobinPacketQueue
的streams_
中,以ssrc為key。 - 每個
Stream
的優先級都被記錄在RoundRobinPacketQueue
的stream_priorites_
中,以優先級為key,ssrc為value。 - 數據包都被封裝成
QueuePacket
緩存在Stream
對象的packet_queue
中,它也是一個優先級隊列,所以每個數據包都是有優先級的。 RoundRobinPacketQueue
的enqueue_times_
記錄著每個rtp packet的入隊時間。stream
中std::multimap<StreamPrioKey,uint32_t>::iterator priority_it
迭代器指向該stream在stream_priorites_
中的位置,便于快速檢索。QueuedPacket
中的std::multiset<Timestamp>::iterator enqueue_time_it
迭代器指向該packet在enqueue_times_
中的位置,便于快速檢索。
緩存隊列中記錄的信息有:
- 記錄總的緩存包個數。
- 記錄總的數據量。
- 記錄包的優先級。
- 記錄包的入隊時間(計算包的緩存總時長,平均緩存時間,最大緩存時間)。
插入隊列(push方法)的邏輯
- 從streams_中找pakcet所屬的Ssrc的stream,如果沒有,則在streams_中插入一項。
- 查看stream的priority_it是否等于stream_priorities_的end():如果相等,則在stream_priorities插入新的項; 否則,如果新包的優先級高,則更新其ssrc對應隊列的優先級。
- 更新隊列總時長。
- 入隊時間減去暫停時間(一般不會有暫停)。
- 隊列總包數+1。
- 隊列總字節大小+包的負載大小+Padding大小(Packet的大小)。
- 插入到steam對象的packet_queue中。
push流程的注意點:
- stream的size指的是stream發送的size,在Pop中,會加上彈出的PacketSize。
- 一條stream的packet的priority值都是一樣的。
- 在入隊一個stream的新的packet時,并不確定優先級,觸發優先級隊列中沒有記錄或packet的優先級發生變化。
取數據(Pop方法)的邏輯
- 獲得優先級最高的stream。
- 從stream的packet_queue中取出第一個Packet。
- 將stream在stream_priorites_中的項刪除掉。
- 計算Packet入隊后到現在的時間(不包括暫停時間)。
- 將這段時間從隊列的總時間中減去。
- 從equeue_times_中將Packet的項刪除。
- 總包數減一。
- 總字節數減去包的字節數。
- 將包從stream中的queue中彈出。
- 如果stream中的隊列為空,則令stream的priority_it指向stream_priorities的end()。
- 否則,從stream隊列頭部取Packet,將該Packet的priority插入到stream_priorities_中。
緩存時間的計算
計算緩存時間的目的是控制延遲,包括如下幾個方法:
- 獲取緩存時間最長的包
Timestamp RoundRobinPacketQueue::OldestEnqueueTime() const {if (single_packet_queue_.has_value()) {return single_packet_queue_->EnqueueTime();}if (Empty())return Timestamp::MinusInfinity();RTC_CHECK(!enqueue_times_.empty());return *enqueue_times_.begin();
}
這個方法是用于統計,最終會被call
對象的GetStats()
方法調用。
- 計算總延時,
UpdateQueueTime
每次被調用,總時長都會被計算,累加。
void RoundRobinPacketQueue::UpdateQueueTime(Timestamp now) {RTC_CHECK_GE(now, time_last_updated_);if (now == time_last_updated_)return;TimeDelta delta = now - time_last_updated_;if (paused_) {pause_time_sum_ += delta;} else {//有n個包,每調一次UpdateQueueTime就有一個delta值,總數為size of packet乘以deltaqueue_time_sum_ += TimeDelta::Micros(delta.us() * size_packets_);}time_last_updated_ = now;
}
- 計算平均緩存時間
平均緩存時間=queue的總時間數/包數,用于判斷延時(緩存時間)是否過大。
TimeDelta RoundRobinPacketQueue::AverageQueueTime() const {if (Empty())return TimeDelta::Zero();return queue_time_sum_ / size_packets_;
}
控制延時
在PacingController::ProcessPackets()
方法中,會計算包的緩存時間,如下if
分支
if (drain_large_queues_) {//限制延時TimeDelta avg_time_left =std::max(TimeDelta::Millis(1),queue_time_limit - packet_queue_.AverageQueueTime());DataRate min_rate_needed = queue_size_data / avg_time_left;if (min_rate_needed > target_rate) {target_rate = min_rate_needed;RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="<< target_rate.kbps();}
}
首先會計算緩存隊列的的平均緩存時間,通過設置的緩存時間限制值減去它得出應該要在多長時間發送這些數據。
再計算發送速率,最后設置目標碼率值。這個目標碼率值會被設置到media_buget
中去(kPeriodic
模式下)。
快速處理
緩存隊列緩存數據,肯定會引入延遲,在RonundBobinPacketQeueu
有一個absl::optional<QueuedPacket> single_packet_queue_
成員變量,它的作用就是快速處理數據包。
只有音頻流時的處理
音頻對延遲很敏感,需要盡量少引入延遲。在RoundRobinPacketQueue::Push
中,有一個分支,如下:
if (size_packets_ == 0) {single_packet_queue_.emplace(QueuedPacket(priority, enqueue_time, enqueue_order,enqueue_times_.end(), std::move(packet)));UpdateQueueTime(enqueue_time);single_packet_queue_->SubtractPauseTime(pause_time_sum_);size_packets_ = 1;size_ += PacketSize(*single_packet_queue_);}
在 size_packets_ == 0,會放到single_packet_queue_
。而每取一個數據包,size_packets
設置0。對音頻包,一次采集周期內,20ms,只會產生一個包,而pacer的執行周期是5ms,音頻包始終會走入if (size_packets_ == 0)
為0的分支。
在取數據包時,在std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket
方法中,會有一個判斷語句,判斷音頻是否走pacer。
bool unpaced_audio_packet =!pace_audio_ && packet_queue_.LeadingAudioPacketEnqueueTime().has_value();
LeadingAudioPacketEnqueueTime()
是判斷single_packet_queue_
或streams_
是否有緩存音頻包(下一個包是否是音頻包)。
absl::optional<Timestamp> RoundRobinPacketQueue::LeadingAudioPacketEnqueueTime()const {if (single_packet_queue_.has_value()) {if (single_packet_queue_->Type() == RtpPacketMediaType::kAudio) {return single_packet_queue_->EnqueueTime();}return absl::nullopt;}if (stream_priorities_.empty()) {return absl::nullopt;}uint32_t ssrc = stream_priorities_.begin()->second;const auto& top_packet = streams_.find(ssrc)->second.packet_queue.top();if (top_packet.Type() == RtpPacketMediaType::kAudio) {return top_packet.EnqueueTime();}return absl::nullopt;
}
如果這個unpaced_audio_packet
變量的值為true,這不會走media_buget_
的機制,直接取出數據。
std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(const PacedPacketInfo& pacing_info,Timestamp target_send_time,Timestamp now) {if (packet_queue_.Empty()) {return nullptr;}// First, check if there is any reason _not_ to send the next queued packet.// Unpaced audio packets and probes are exempted from send checks.bool unpaced_audio_packet =!pace_audio_ && packet_queue_.LeadingAudioPacketEnqueueTime().has_value();bool is_probe = pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe;//不pace audioif (!unpaced_audio_packet && !is_probe) {if (Congested()) {// Don't send anything if congested.return nullptr;}if (mode_ == ProcessMode::kPeriodic) {if (media_budget_.bytes_remaining() <= 0) {// Not enough budget.RTC_LOG(LS_INFO) << "===> media budget not enough";return nullptr;}} else {// Dynamic processing mode.if (now <= target_send_time) {// We allow sending slightly early if we think that we would actually// had been able to, had we been right on time - i.e. the current debt// is not more than would be reduced to zero at the target sent time.TimeDelta flush_time = media_debt_ / media_rate_;if (now + flush_time > target_send_time) {return nullptr;}}}}//直接取出數據return packet_queue_.Pop();
}
在std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop()
方法中,走下面這個分支。
if (single_packet_queue_.has_value()) {//音頻包走這個分支RTC_DCHECK(stream_priorities_.empty());std::unique_ptr<RtpPacketToSend> rtp_packet(single_packet_queue_->RtpPacket());single_packet_queue_.reset();queue_time_sum_ = TimeDelta::Zero();size_packets_ = 0;size_ = DataSize::Zero();return rtp_packet;}
在只有音頻的情況下,音頻包只會入single_packet_queue_
,并且不會走media_buget_
的機制,每次時間片內都會馬上取出來發送出去,起到降低延遲的作用。
音視頻流的處理
PacingController::ProcessPackets
是每5ms跑一次(kPeriodic
模式)。視頻數據,一次會產生一批rtp包,在間隔周期內,會有多個包進入隊列。在size_packets_
為0時,包會進入single_packet_queue_
,不為0時進入包緩存隊列。在這個時候media_budget_
就起作用了。
音視頻流都存在的情況下,音頻包也不止會進入single_pakcet_queue_
了,這時音頻的加速就體現在std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket
上了,判斷為音頻包時,則不走media_buget_
機制,直接取出數據。
對非音頻包,則下面這個分支會起作用,限制包的發送。
if (mode_ == ProcessMode::kPeriodic) {if (media_budget_.bytes_remaining() <= 0) {// Not enough budget.RTC_LOG(LS_INFO) << "===> media budget not enough";return nullptr;}
}
IntervalBudget
原理
IntervalBudget
作用是根據當前PacedSender->Process
的調用時間間隔和當前目標碼率target bitrate
來計算出本次Process
理應發送的字節數。
比如當前碼率是100 000bps,本次Process
調用與上次調用間隔是20ms,則本次理應發送的字節數是100 bits per ms * 20 ms = 2000bits=250 bytes
。
250bytes為本次發送理應發送的字節數,但實際上視頻RTP包差不多是一個MTU大小。我們不可能真的發送250bytes的數據,因此可能會導致理應發送的數據量多或少的問題,如何解決這個問題呢?
IntervalBudget
中引入一個bytes_remaining_
的變量來記錄上次發送后,與理應發送數據量相比,多或少發了多少。其值為負表示上輪我們實際發送的比理應發送的數據量多了,我們本輪應該停止發送。其值為正表示我們上輪實際發送比理應發送的要少,還有富余。
工作原理
void set_target_rate_kbps(int target_rate_kbps);
設置總的可用量max_bytes_in_budget_
。
void IntervalBudget::set_target_rate_kbps(int target_rate_kbps) {target_rate_kbps_ = target_rate_kbps;max_bytes_in_budget_ = (kWindowMs * target_rate_kbps_) / 8;bytes_remaining_ = std::min(std::max(-max_bytes_in_budget_, bytes_remaining_),max_bytes_in_budget_);
}
target_rate_kbps
目標碼率,max_bytes_in_budget_
為半秒鐘可發送的碼率。
void IncreaseBudget(int64_t delta_time_ms);
根據毫秒數增加預算(增加的量計入bytes_remaining
),在kPeriodic
模式下,這個delta_time_ms的值為5ms。
void IntervalBudget::IncreaseBudget(int64_t delta_time_ms) {int64_t bytes = target_rate_kbps_ * delta_time_ms / 8;if (bytes_remaining_ < 0 || can_build_up_underuse_) {// We overused last interval, compensate this interval.bytes_remaining_ = std::min(bytes_remaining_ + bytes, max_bytes_in_budget_);} else {// If we underused last interval we can't use it this interval.bytes_remaining_ = std::min(bytes, max_bytes_in_budget_);}
}
void UseBudget(size_t bytes);
使用預算(bytes_remaining_
減去bytes)。
void IntervalBudget::UseBudget(size_t bytes) {bytes_remaining_ = std::max(bytes_remaining_ - static_cast<int>(bytes),-max_bytes_in_budget_);
}
UseBudget(size_t bytes)
更新用掉的數據量(就是已發送的數據量),如下調用堆棧
如果bytes_remaining_
小于0,那么當然不能在發數據了。
**padding_budget_**
的原理也一樣,它是用于計算padding的數據量。
碼率平滑的實現原理
發包的流程PacingController::ProcessPackets
放在一個線程中,會被定時觸發。被觸發后,會計算當前時間和上次被調用時間的時間差,然后將時間差參數傳入media_buget
(**IntervalBudget**
對象),media_buget_
算出當前時間片可以發送多少數據,然后從緩存隊列(**RoundBobinPacketQueue**
對象)中取出數據進行發送。
**media_buget_**
計算時間片發送多少字節的公式如下:
**delta time:**上次檢查時間點和這次檢查時間點的時間差。
target bitrate: pacer的參考碼率,是由probe模塊根據網絡探測帶寬評估出來。
remain_bytes: 每次觸發包時會減去發送報文的長度size,如果remain_bytes>0
,繼續從緩存隊列中取下一個報文進行發送,直到remain_bytes<=0
或者緩存隊列沒有更多的報文。
如果緩存隊列沒有更多待發的報文,但是**media_buget_**
(**IntervalBudget**
對象)計算出還可以發送更多的數據,這個時候pacer會進行padding報文補充。
四個用于控制發送碼率的方法:
**bool PacingController::Congested()**
**void PacingController::OnPacketSent**
** 底層socket發送的數據量的回調。**
**void PacingController::UpdateBudgetWithSentData(DataSize size)**
**void PacingController::UpdateOutstandingData(DataSize outstanding_data)**
碼率分配
數據包的優先級
前面就提到了緩存隊列是一個優先級隊列,對數據包會設置一個優先級,在每次插入數據時(PacingController::EnqueuePacket(...)
方法),都會調用GetPriorityForType(RtpPacketMediaType type)
,如下優先級:
int GetPriorityForType(RtpPacketMediaType type) {// Lower number takes priority over higher.switch (type) {case RtpPacketMediaType::kAudio:// Audio is always prioritized over other packet types.return kFirstPriority + 1;case RtpPacketMediaType::kRetransmission:// Send retransmissions before new media.return kFirstPriority + 2;case RtpPacketMediaType::kVideo:case RtpPacketMediaType::kForwardErrorCorrection:// Video has "normal" priority, in the old speak.// Send redundancy concurrently to video. If it is delayed it might have a// lower chance of being useful.return kFirstPriority + 3;case RtpPacketMediaType::kPadding:// Packets that are in themselves likely useless, only sent to keep the// BWE high.return kFirstPriority + 4;}RTC_CHECK_NOTREACHED();
}
在QueuedPacket
中的operator<(const RoundRobinPacketQueue::QueuedPacket& other)
會根據優先級確定QueuedPacket
在隊列中順序。priority值越小,代表優先級越高,如下,在QueuedPacket
中定義的bool operator<(const QueuedPacket& other) const
bool RoundRobinPacketQueue::QueuedPacket::operator<(const RoundRobinPacketQueue::QueuedPacket& other) const {if (priority_ != other.priority_)return priority_ > other.priority_;if (is_retransmission_ != other.is_retransmission_)return other.is_retransmission_;return enqueue_order_ > other.enqueue_order_;
}
● 優先值小的,排在前面。
● 優先級相同,非重傳包在前面。
● 優先級和重傳標志均相同,以入隊先后順序排列(enqueue_order_就是一個遞增的值)。