用戶態網絡緩沖區
- 網絡緩沖區原理
- 為什么需要用戶態網絡緩沖區
- Linux下如何接收和發送數據包
- 用戶態網絡緩沖區設計的本質
- 網絡緩沖區代碼實現
網絡緩沖區原理
為什么需要用戶態網絡緩沖區
在網絡開發中,我們經常使用到read/write/recv/send
等系統調用接口,我們需要理解這些函數的本質還是個拷貝函數,我們以read
和write
為例,他們其實就是將數據從用戶空間拷貝到內核空間當中,內核態中是存在接收緩沖區和發送緩沖區的。
接下來我們來看read
和write
系統調用函數的含義:
ssize_t read(int fd, void *buf, size_t count);ssize_t write(int fd, const void *buf, size_t count);
對于read
和write
函數來說,都是有返回值的,返回值就代表我們實際上拷貝的數量,也就是我當前需要寫入到內核緩沖區當中的數量,而參數之一的count
所代表的就是預估的一個拷貝的數量。
當前我們就需要思考一個問題,用戶態的情況下,我們是不知道對應內核態的緩沖區有多大的,我們怎么能保證我們所需要拷貝的數據一次性就能拷貝過去而不是分好幾次進行拷貝的呢?如果一次性拷貝不完剩下的數據不就會被丟掉嗎,顯然是不行的,所以在這兒我們就需要去設置一個用戶態緩沖區來保存這些數據,保證其在沒有被完整拷貝之前不會被丟棄掉,這也是用戶態緩沖區所需要設置的一個重要的原因之一。
Linux下如何接收和發送數據包
我們都知道,網絡通信是圍繞著整個網絡通信協議棧的:
在我們用戶態看來,數據包就是 data,在 TCP 協議棧當中,是以 segment 表示,IP 協議中以 packet 表示,MAC 當中以 frame 表示,整個協議棧中,數據包都是以 sk_buffer 來進行流轉的,協議棧也只會去識別對應的 sk_buffer。
網絡數據其實整個流轉流程也是在上圖這樣一個狀態下進行流轉的,首先我們來看一下接收數據包的流程:
- 網卡接收到數據報,通過 DMA 將數據包寫入到內存(ringbuffer結構當中);
- 網卡向 CPU 發起硬件中斷,CPU 收到硬件中斷請求,根據中斷表查找中斷處理函數,調用中斷處理函數;
- 中斷處理函數將屏蔽硬件中斷,發起軟件中斷(硬件中斷是一個線程在執行,不能長時間被占用,避免 CPU 頻繁被網卡中斷,這兒需要使用軟件中斷處理耗時操作,避免執行時間過長,CPU 無法響應其他的硬件中斷);
- 內核專門線程負責軟件中斷,從 ringbuffer 當中將數據取出到 sk_buffer 當中(注意,這個是循環操作,直到 ringbuffer 中沒有數據);
- 從幀頭取出 IP 協議,判斷是 IPV4 還是 IPV6 ,去掉幀頭幀尾;
- 從 IP 頭中看出上一層是 TCP 協議還是 UDP 協議,根據五元組或者是 fd 找到對應的 socket ,將數據提取出來放到對應的 socket 接收緩沖區當中,軟件中斷處理結束以后開啟硬件中斷;
- 應用程序通過調用系統調用函數將接收緩沖區當中的數據拷貝到用戶的緩沖區當中。
在了解發送數據包的流程的流程時我們需要思考一個問題,UDP/TCP 協議的緩沖區是否一致?
我們要知道對于 UDP 協議來說,他是面向數據報的一種協議,也就是說用戶態下發送一個數據包,有多大使用 UDP 協議就會發多大,如果超過對應的長度 UDP 協議就會丟掉多余的部分,也不會重傳,這也就意味著 UDP 協議其實是用不到發送緩沖區的,我直接發送原始的數據包即可。但是接收緩沖區卻是必不可少的,因為接收數據的過程中我們可能存在一次性接收不完的情況發生,對應的數據就需要先被暫存下來。
再來看一下發送數據包的流程:
- 用戶態下調用系統調用函數將數據拷貝到 sk_buffer 當中并且將數據放到 socket 的發送緩沖區當中(TCP);
- 網絡協議棧從 socket 的發送緩沖區當中取出 sk_buffer 并且會克隆一個新的 sk_buffer(TCP是支持重傳機制的,克隆就是為了保證可以進行重傳);
- 根據協議棧向下進行傳遞,一次增加 TCP/UCP 頭部,IP 頭部,MAC幀頭,幀尾(TCP會進行分段,IP 會進行分片(TCP/UDP 都會));
- 觸發軟件中斷通知網卡驅動程序,有新的數據包需要進行發送;
- 網卡驅動程序依次從發送隊列中取出數據 sk_buffer 放到 ringbuffer 當中(內存 DMA 區域,網卡讀到);
- 觸發網卡發送,發送成功,觸發硬件中斷,釋放掉對應的 ringbuffer 和 sk_buffer(TCP 是克隆的,UDP 是原始的);
- 當收到 TCP 的 ACK 應答以后,就會釋放掉原始的 sk_buffer。
對于 TCP 協議來說,他的發送緩沖區是分段設計的,可以參考一下之前的一片文章TCP協議詳解,我們在了解了網絡數據包的接收和發送原理以后,我們再回來看發送緩沖區與接收緩沖區。
用戶態網絡緩沖區設計的本質
發送緩沖區
對于發送緩沖區來說,我們可以理解為生產者與消費者速度不一致的問題,生產者生產數據的速度如果大于消費者消費的速度,我們就需要保證生產者發送的數據被接收到,那我們就需要一個緩沖區將數據先保存下來,等待對端對數據進行處理。
另一個解決的問題就是用戶態本身不會知道內核中緩沖區有多大,并不是一次將數據都發送完畢,此時就需要預先將數據存儲起來,緩存那些沒有被發送出去的數據。
接收緩沖區
接收緩沖區同樣也是要去解決掉生產者的速度大于消費者速度的問題,跟發送緩沖區一致,而另一個要解決的問題就是粘包問題。
為什么會出現粘包問題?
對于用戶態來說,從內核的接收緩沖區當中讀取到的數據是不確定的,我們不能保證他就是一個完整的包,他可能是半個包,也可能是一個半的包,如果是半個包,我們就需要先將這個數據包保存下來,等到讀取到一個完整包數據以后在進行處理,如果是一個半包的數據,就需要優先去處理一個完整包的數據,將剩下半個包的數據暫存下來,基于這種考慮,就需要用到我們的用戶態接收緩沖區。
如何解決粘包問題?
解決粘包問題有兩種方式:
- 我們程序員自己去制定一套規則對數據包進行處理,比如說用特殊分隔符界定數據包(
\r\n
),我們再讀取到這個數據包的時候,如果讀取到的是\r\n
,就證明他之前的數據是一個完整的數據包,此時就進行處理即可; - 用長度去界定數據包,我們可以讓一個數據包的頭部分配兩個字節去保存一個完整的數據包的長度,我們在讀取數據的過程中只讀取這個長度的數據包,然后進行處理,就保證了我們處理的是一個完整的數據包。
網絡緩沖區代碼實現
實現一個用戶態的網路緩沖區,我們首先需要考慮什么樣的數據結構最為合適,第一種就是定長數組,固定長度。
如果使用定長數組的話,會存在的問題就在于:
- 空間大小不確定,會出現分分配空間不足或者是分配的空間太大了,導致空間浪費的現象發生;
- 會頻繁的進行數據的騰挪,因為我們讀取到一個完整的數據包以后,就需要將剩下的數據騰挪首部的位置,保證下一次的數據讀取。
接下來我們可以考慮 ringbuffer 這種環形隊列結構:
對于這種結構來說:
- 解決了數據騰挪的問題,因為他是循環的結構,但是他也是固定大小,伸縮性也會比較差,而且還會出現數據離散性的問題。
對于離散性,我們可以只用系統調用函數readv/writev
去解決掉,這兩個函數的作用就是用于將多個非連續的內存緩沖區中的數據一次性寫入文件描述符,解決掉數據不連續我們依然可以讀取到一個 buffer 中的問題。
對于伸縮性,我們可以使用 STL 容器中的 vector 來進行實現,他是可以進行擴容的,那么最終的一個數據結構就是一個 vector 加上 head 與 tail 兩個索引來進行設計。
#ifndef __MESSAGE_BUFFER__
#define __MESSAGE_BUFFER__#include <bits/types/struct_iovec.h>
#include <stdint.h>
#include <vector>
#include <cstring>
#include <sys/uio.h>
#include <errno.h>class MessageBuffer
{
public:MessageBuffer() : rpos_(0), wpos_(0){buffer_.resize(4096);}explicit MessageBuffer(std::size_t size) : rpos_(0), wpos_(0){buffer_.resize(size);}// 允許移動構造MessageBuffer(MessageBuffer &&other) noexcept: buffer_(std::move(other.buffer_)), rpos_(other.rpos_), wpos_(other.wpos_){other.rpos_ = 0;other.wpos_ = 0;}// 移動賦值MessageBuffer &operator=(MessageBuffer &&other) noexcept{if (this != &other){buffer_ = std::move(other.buffer_);wpos_ = other.wpos_;rpos_ = other.rpos_;other.wpos_ = 0;other.rpos_ = 0;}return *this;}// 獲取頭指針uint8_t* GetBasePointer(){return buffer_.data();}// 獲取讀指針uint8_t* GetReadPointer(){return buffer_.data() + rpos_;}// 獲取寫指針uint8_t* GetWritePointer(){return buffer_.data() + wpos_;}// 移動讀的下標void ReadCompleted(std::size_t size){rpos_ += size;}// 移動寫的下標void WriteCompleted(std::size_t size){wpos_ += size;}// 有效數據長度std::size_t GetActiveSize() const{return wpos_ - rpos_;}// 當前空閑空間,不需要騰挪數據std::size_t GetFreeSize() const{return buffer_.size() - wpos_;}// 整個buffer的大小std::size_t GetBufferSize() const{return buffer_.size();}// 騰挪數據void NormalSize(){if (rpos_ > 0) {std::memmove(buffer_.data(), buffer_.data() + rpos_, GetActiveSize());wpos_ -= rpos_;rpos_ = 0;}}// 確定當前空間是否足夠,盡可能的不去進行擴容和騰挪數據void EnsureSpace(std::size_t size){if (GetBufferSize() - GetActiveSize() < size) {buffer_.resize(buffer_.size() + std::max(size, buffer_.size() / 2));NormalSize();}else if (GetFreeSize() < size) {NormalSize();}}// 寫進用戶態緩沖區void Write(const uint8_t* data, std::size_t size){if (size > 0){EnsureSpace(size);std::memcpy(GetWritePointer(), data, size);WriteCompleted(size);}}// 獲取到所有的數據std::pair<uint8_t*, std::size_t> GetAllData(){return {GetReadPointer(), GetActiveSize()};}// 獲取第一個 \r\n 之前的數據的指針和大小(若未找到返回nullptr和0)std::pair<uint8_t *, std::size_t> GetDataUntilCRLF(){uint8_t* data = GetReadPointer();std::size_t active_size = GetActiveSize();for(size_t i = 0; i < active_size - 1; i++){if(data[i] == '\r' && data[i + 1] == '\n'){return {data, i};}}return {nullptr, 0};}// linux reactor readv// 1. 盡可能的不騰挪數據// 2. 避免了每次都從棧上拷貝到堆上int Recv(int fd, int* err){char extra[65535]; // UDP最大發送長度,大于這個長度需要在應用自己分層struct iovec iov[2];iov[0].iov_base = GetWritePointer();iov[0].iov_len = GetFreeSize();iov[1].iov_base = extra;iov[1].iov_len = 65535;// 通過readv讀去離散型數據int n = readv(fd, iov, 2);if (n < 0) {*err = errno;return n;} else if (n == 0) {*err = ECONNRESET;return 0;} else if (n < GetFreeSize()) {WriteCompleted(n);return n;} else {std::size_t extra_size = n - GetFreeSize();WriteCompleted(GetFreeSize());Write(reinterpret_cast<uint8_t*>(extra), extra_size);return n;}}/*char buffer[65535];int n = read(fd, buffer, 65535);if (n == 0) {// 斷開連接} else if (n < 0) (// ETif (errno == EINTR){}if (errno == EAGAIN I| errno == EWOULDBLOCK){//讀取數據時沒有數據可讀}else {// 發生錯誤}else {//讀取到數據Write(buffer, n);*/MessageBuffer(const MessageBuffer &) = delete;MessageBuffer &operator=(const MessageBuffer &) = delete;private:std::vector<uint8_t> buffer_;std::size_t rpos_;std::size_t wpos_;
};#endif
注意:
- 我們當前的設計當中,應該盡可能的去保證數據不進行騰挪和擴容,這個也是會產生消耗的;
- 我們從內核的接收緩沖區當中讀取數據時,一般情況下都會有一個操作,就是將對應的數據拷貝到我們的棧上,然后在讀到對應的用戶態緩沖區當中,這相當于是進行了兩次數據拷貝,在我們的設計當中,使用了
readv
函數,支持離散性數據拷貝,避免了兩次數據拷貝情況的發生,也保證了盡量不去騰挪數據的情況。
注意,我們這兒所談到的緩沖區是用戶態網絡緩沖區,跟內核的網絡緩沖區是存在區別的,這兩個概念是不可以進行混淆的。