在追求高性能的分布式系統中,RPC 框架的底層網絡能力和數據傳輸效率起著決定性作用。經過幾輪迭代優化,我完成了第四版本的 RPC 框架。相比以往版本,這一版本的最大亮點在于 重寫了底層網絡框架 和 實現了發送端的數據聚集機制,這使得框架在高并發、高吞吐場景下表現更穩定、更高效。本文將重點介紹這兩個新功能的設計動機、技術選型與實現細節。
代碼倉庫:https://github.com/karatttt/MyRPC
版本四新增功能
重寫 Go 原生 net 庫
背景:
先說說go原生net的處理邏輯是:
每個 fd 對應?個 goroutine,業務?對 conn 發起主動的讀寫,底層使??阻塞 IO,當事件未就緒,將 fd 注冊(epoll_ctl)進 epoll fd,通過把 goroutine 設置(park)成 GWaiting 狀態。當有就緒事件后,喚醒(ready) 對應 goroutine 成 GRunnable 狀態------go會在調度goroutine時候執行epoll_wait系統調用,檢查是否有狀態發生改變的fd,有的話就把他取出,喚醒對應的goroutine去處理
在前三個版本中,我使用了 Go 原生的 net 庫作為 RPC 的通信基礎。雖然 Go 的網絡抽象簡單易用,但在構建高性能、低延遲的服務端系統時,它逐漸暴露出如下限制:
- 每一個連接必須需要一個協程,需要在協程中完成編解碼和序列化反序列化的操作,連接關閉或者網絡錯誤無法即時感知銷毀協程(go的調度模型使得連接和協程是一一對應的,因為非阻塞的Read實際上交由用戶調用,而調用的時機也同樣在該協程中發生)
- gonet原生網絡庫是ET模式,這意味著當可讀事件發生時,需要一次性的從緩沖區中讀出所有的數據,因為如果沒有讀完,事件不會在下一次的epollwait中喚醒(除非新數據到達該緩沖區),無法再次讀取。而這個循環讀取同樣也需要在用戶協程中處理
受 netpoll 和 tnet 等優秀項目的啟發,我決定基于 epoll(Linux)實現一套更底層、更靈活的網絡事件驅動模型,實際上以上兩個項目,并結合目前的RPC實現完整功能
實現思路:
對于第一個問題,可以借鑒netty的做法,分為Reactor線程和subReactor線程,他們都是poller線程,通過epoll_wait來監聽事件循環,但是reactor線程只負責監聽新連接,subReactor負責IO讀寫,并將業務處理交由線程池管理。
我們可以采集類似的做法,設置多個poller協程,并且讓IO讀寫(編解碼和序列化流程)交由poller線程處理,實際上的業務邏輯交由協程池處理,這樣的總的協程數量就是poller數量 + 協程池的協程數量
對于第二個問題,實際上前面的版本采取了長連接的做法來避免連接的頻繁建立和關閉,也就是服務端對每一個連接的readFrame是循環進行的(ET模式需要循環讀完數據),直到一定時間未收到數據關閉這個連接。但是對于多客戶端的情況,我們仍然會出現大量的連接,且每一個連接都需要阻塞直到到達最大空閑時間才主動關閉,就會導致連接過多(協程過多),我們希望使用LT模式,在讀取完一幀之后并通過業務協程池異步處理業務邏輯后,主動釋放協程,執行其他的協程
實際上目前的netpoll和tnet實現了類似的機制,但是他們都是提供了一個零拷貝接口由業務方調用,當融入RPC系統(往往需要反序列化的場景)后,零拷貝后的在緩沖區的數據,還會因為反序列化而進行到用戶態的拷貝,所以上面的零拷貝實際上適合的場景時proxy / 轉發場景,或者只關心字節數據的場景。所以我去除了零拷貝的設計,直接融入當前的RPC系統
PollerManager
type manager struct {polls []PollnumLoops int32pickIdx int32
}// Init 初始化并創建 poll 數組
func (m *manager) InitManager(numPolls int) error {fmt.Printf("Initializing poll manager with %d pollers\n", numPolls)if numPolls < 1 {numPolls = 1}atomic.StoreInt32(&m.numLoops, int32(numPolls))m.polls = make([]Poll, numPolls)for i := 0; i < numPolls; i++ {poll, err := NewDefaultPoll()if err != nil {fmt.Printf("Failed to create poller %d: %v\n", i, err)return err}m.polls[i] = pollgo poll.Wait()}return nil}
- 首先初始化一個pollerManager,來初始化多個可能的poller協程(最少一個),并且調用poll.wait開啟事件循環
poller相關操作
// Control implements Poll.
func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {fd := operator.FDvar op intvar evt syscall.EpollEventp.setOperator(unsafe.Pointer(&evt.Fd), operator)switch event {case PollReadable: // server accept a new connection and wait readop, evt.Events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERRcase PollWritable: // client create a new connection and wait connect finishedop, evt.Events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERRcase PollDetach: // deregisterp.delOperator(operator)op, evt.Events = syscall.EPOLL_CTL_DEL, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERRcase PollR2RW: // connection wait read/writeop, evt.Events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERRcase PollRW2R: // connection wait readop, evt.Events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR}evt.Fd = int32(fd)return EpollCtl(p.fd, op, fd, &evt)
}func (p *defaultPoll) Wait() error {events := make([]syscall.EpollEvent, 128)for {n, err := syscall.EpollWait(p.fd, events, -1)if err != nil {if err == syscall.EINTR {continue}return err}for i := 0; i < n; i++ {fd := int(events[i].Fd)op := p.operators[fd]if op == nil {continue}evt := events[i].Eventsif evt&(syscall.EPOLLIN|syscall.EPOLLPRI) != 0 && op.OnRead != nil {_ = op.OnRead(op.Conn)if op.Type == ConnectionType {// 關閉該事件,避免LT模式持續onRead_ = p.Control(op, PollDetach)}}if evt&(syscall.EPOLLOUT) != 0 && op.OnWrite != nil {_ = op.OnWrite(op)}}}
}
- 為了方便后面理解,這里先放出poller的相關操作,control就是注冊事件,wait就是進行事件循環,這里的wait,對于可讀事件,直接調用傳入的OnRead,如果是已存在連接的數據可讀,進行事件的關閉(不然這個實際上已經讀完的連接就會一直被喚醒。。。)
eventLoop
// Serve implements EventLoop.
func (evl *eventLoop) Serve(ln net.Listener) error {evl.Lock()evl.ln = lnfd, err := getListenerFD(ln)if err != nil {return err}operator := FDOperator{FD: int(fd),OnRead: evl.ListenerOnRead,Type: ListenerType, // 標記為監聽器類型}operator.poll = pollmanager.Pick()err = operator.Control(PollReadable)evl.Unlock()return err
}// 每一個事件循環中一定有listen連接的事件,當事件就緒的時候就調用這個函數
func (evl *eventLoop) ListenerOnRead(conn net.Conn) error {conn, err := evl.ln.Accept()if err != nil {// 非阻塞下 accept 沒有新連接時返回if ne, ok := err.(net.Error); ok && ne.Temporary() {// 臨時錯誤,繼續等待return nil}fmt.Println("Accept error:", err)return err}fmt.Printf("Accepted new connection: %s\n", conn.RemoteAddr())// 選擇 pollerpoller := pollmanager.Pick()if poller == nil {fmt.Println("No available poller")conn.Close()}// 獲取FDrawConn, ok := conn.(syscall.Conn)if !ok {// 不是 syscall.Conn,不能獲取 fd}var fd intsysRawConn, err := rawConn.SyscallConn()if err != nil {fmt.Println("Error getting syscall connection:", err)} else {err = sysRawConn.Control(func(f uintptr) {fd = int(f)})if err != nil {fmt.Println("Error getting file descriptor:", err)}}// 初始化連接OpConn := connection.InitConn(conn)fmt.Printf("Initialized connection with FD: %d\n", fd)// 創建 FDOperator 并注冊到 pollernewOp := &FDOperator{poll : poller,Conn: OpConn,FD: fd,OnRead: evl.opts.onRequest, // 這里傳入業務處理函數Type: ConnectionType, // 標記為連接類型}if err := poller.Control(newOp, PollReadable); err != nil {fmt.Println("Error registering connection:", err)conn.Close()}fmt.Printf("Registered new connection with FD: %d\n", fd)return nil
}
- 開啟了poller的wait,就要為其分配事件,也就是初始化這個eventLoop,這個server只需要執行一次,注冊一個listener監聽連接,并且定制一個OnRead()
- 這個OnRead實際上就是accept一個連接,然后為這個連接注冊一個可讀事件(Control)
ServerTransport
啟動server時,也需要一點改動,融入這個新的網絡框架
// serveTCP 處理 TCP 連接
func (t *serverTransport) serveTCP(ctx context.Context, ln net.Listener) error {//初始化事件循環eventLoop, err := poller.NewEventLoop(t.OnRequest)if err != nil {return fmt.Errorf("failed to create event loop: %w", err)}err = eventLoop.Serve(ln)if err != nil {return fmt.Errorf("failed to serve: %w", err)}return nil
}// handleConnection 處理單個連接
func (t *serverTransport) OnRequest(conn net.Conn) error {// 設置連接超時idleTimeout := 30 * time.Secondif t.opts != nil && t.opts.IdleTimeout > 0 {idleTimeout = t.opts.IdleTimeout}// 設置讀取超時conn.SetReadDeadline(time.Now().Add(idleTimeout))// 處理連接fmt.Printf("New connection from %s\n", conn.RemoteAddr())frame, err := codec.ReadFrame(conn)if err != nil {// 2. 如果讀取幀失敗,如客戶端斷開連接,則關閉連接if err == io.EOF {fmt.Printf("Client %s disconnected normally\n", conn.RemoteAddr())return err}// 3. 如果連接超時,超過設置的idletime,關閉連接if e, ok := err.(net.Error); ok && e.Timeout() {fmt.Printf("Connection from %s timed out after %v\n", conn.RemoteAddr(), idleTimeout)return err}// 4. 處理強制關閉的情況if strings.Contains(err.Error(), "forcibly closed") {fmt.Printf("Client %s forcibly closed the connection\n", conn.RemoteAddr())return err}fmt.Printf("Read error from %s: %v\n", conn.RemoteAddr(), err)return err}// 重置讀取超時conn.SetReadDeadline(time.Now().Add(idleTimeout))// 使用協程池處理請求,適用于多路復用模式frameCopy := frame // 創建副本避免閉包問題err = t.pool.Submit(func() {// 處理請求response, err := t.ConnHandler.Handle(context.Background(), frameCopy)if err != nil {fmt.Printf("Handle error for %s: %v\n", conn.RemoteAddr(), err)return}// 發送響應conn = conn.(netxConn.Connection) // 確保conn實現了Connection接口,調用聚集發包的接口if _, err := conn.Write(response); err != nil {fmt.Printf("Write response error for %s: %v\n", conn.RemoteAddr(), err)}})if err != nil {fmt.Printf("Submit task to pool error for %s: %v\n", conn.RemoteAddr(), err)// 協程池提交失敗,直接處理response, err := t.ConnHandler.Handle(context.Background(), frame)if err != nil {fmt.Printf("Handle error for %s: %v\n", conn.RemoteAddr(), err)}if _, err := conn.Write(response); err != nil {fmt.Printf("Write response error for %s: %v\n", conn.RemoteAddr(), err)return err}}return nil
}
- 可以看到serveTCP的適合啟動一個事件循環,并傳入一個OnRequest(作為事件就緒的時候的OnRead),當連接可讀的時候調用這個方法
- 這個OnRequest在一開始通過codec.ReadFrame(conn)讀取一個幀,這里只需要關心一個幀的原因是采取了LT模式,后續的沒有讀完的幀自然會再次喚醒,并且如果這里循環獲取了,一個是循環停止的界限不好控制(什么時候才算數據讀完?實際上的go的ioRead對于用戶層面是阻塞,但底層通過 運行時調度器 + 多線程(GMP) 實現了“偽非阻塞”,也就是可能當Read() 一個永遠沒有數據的連接,那么這個 goroutine 會一直阻塞掛起(休眠狀態),不會主動退出、不會被銷毀,),還有一個是會阻塞該poller協程,影響到其他事件的處理。
- 需要注意的是,業務處理必須要用協程池處理,避免阻塞poller協程
- 這樣就實現了讓poller線程處理IO,并且通過LT模式減少連接的優化
批量發包
背景
其實在io讀寫中,還有一個消耗性能的就是頻繁的系統調用,涉及到兩態數據間的拷貝。比如服務端回包的時候,每一次的回包都是一次系統調用,這里就是可以優化的地方。
所以可以通過批量的形式,來減少系統調用,也就是用一個緩沖區來實現發包的聚集效應,當實際發生系統調用時,將緩沖區的所有數據一并發出,而不是每一次有數據就發生系統調用。
實現思路:
為什么收包的時候不批量呢?前面的OnRequest中的IoRead實際上也是一次系統調用,如果這里要實現聚集效應批量收包,也就是每一次epoll喚醒后,先將數據存到緩沖區中(這里可以用零拷貝),然后這里OnRead來挖緩沖區(只涉及到一次系統調用),但是這樣帶來的問題是,需要在OnRead中解決半包粘包問題,且要為每一個連接單獨提供一個這樣的緩沖區(實際上這個形式的緩沖區是有的,也就是linkBuffer,大家感興趣可以去看看它的實現,但是它的主要功能還是為了提供零拷貝接口,只是為了批量收包而引入這個數據結構有點多余了。。。而且這個帶來的收益只是單個連接維度下的收包聚集,從而系統調用次數的減少,假如一個連接只有一次的數據傳輸,實際上還是每一次事件就緒就需要一次系統拷貝)
對于發包的時候的聚集,我們就可以在整個系統維度下,多個連接將包放到一個并發安全的隊列中,交由poller線程的寫事件來決定什么時候寫出,所以需要實現一個線程安全的隊列,以及批量發包的接口
func (r *Ring[T]) commit(seq uint32, val T) {item := &r.data[seq&r.mask]for {getSeq := atomic.LoadUint32(&item.getSeq)putSeq := atomic.LoadUint32(&item.putSeq)// Waiting for data to be ready for writing. Due to the separation of// obtaining the right to use the sequence number and reading and writing// data operations, there is a short period of time that the old data has// not been read, wait for the read operation to complete and set getSeq.if seq == putSeq && getSeq == putSeq {break}runtime.Gosched()}// Complete the write operation and set putSeq to the next expected write sequence number.item.value = valatomic.AddUint32(&item.putSeq, r.capacity)
}func (r *Ring[T]) consume(seq uint32) T {item := &r.data[seq&r.mask]for {getSeq := atomic.LoadUint32(&item.getSeq)putSeq := atomic.LoadUint32(&item.putSeq)// Waiting for data to be ready to read. Due to the separation of// obtaining the right to use the sequence number and reading and writing// data operations, there is a short period of time that the writing data has// not been written yet, wait for the writing operation to complete and set putSeq.if seq == getSeq && getSeq == (putSeq-r.capacity) {break}runtime.Gosched()}// Complete the read operation and set getSeq to the next expected read sequence number.val := item.valuevar zero Titem.value = zeroatomic.AddUint32(&item.getSeq, r.capacity)return val
}
- 以上的這個ringBuffer的借鑒了tent的實現,但是實際上它和LMAX Disruptor的思想是一致的,都是實現了無鎖化的并發安全隊列,主要是以上的兩個put和get的邏輯
- 舉一個例子:
每個槽位的 putSeq 和 getSeq 都初始化為槽位的下標:
slot[0]: putSeq=0, getSeq=0
slot[1]: putSeq=1, getSeq=1
slot[2]: putSeq=2, getSeq=2
slot[3]: putSeq=3, getSeq=3
第一次 Put(寫入):
寫入線程獲得 seq=1,即它準備寫入 slot[1]:
- 寫入 slot[1].value = val
然后執行:slot[1].putSeq += capacity → slot[1].putSeq = 1 + 4 = 5
現在:
slot[1]: putSeq=5, getSeq=1
表示這個槽位已經寫入完成,等待消費者讀取。
第一次 Get(讀取):
讀取線程獲得 seq=1,即從 slot[1] 讀數據:
- 消費成功后,執行:slot[1].getSeq += capacity → slot[1].getSeq = 1 + 4 = 5
現在:
slot[1]: putSeq=5, getSeq=5
說明這一輪(第1輪)讀寫都結束了,可以被下一輪復用。
第二輪 Put:
寫入線程再次獲得 seq=5(因為 tail 不斷遞增),這時還是映射到 slot[1],因為:
slotIndex = seq & (capacity - 1) = 5 & 3 = 1
此時:
- 它要判斷:seq == putSeq && getSeq == putSeq,才能繼續寫
- 此時 putSeq=5,getSeq=5,滿足條件
說明這個槽位已經被消費完了,可以再次復用來寫入!也就是說,這個序號的作用是為了分配到該槽位時,保證數據不被覆蓋,讀和寫都是安全的。
Buffer批量發包
func (b *Buffer) start() {initBufs := make(net.Buffers, 0, maxWritevBuffers)vals := make([][]byte, 0, maxWritevBuffers)bufs := initBufsdefer b.opts.handler(b)for {if err := b.getOrWait(&vals); err != nil {b.err = errbreak}for _, v := range vals {bufs = append(bufs, v)}vals = vals[:0]if _, err := bufs.WriteTo(b.w); err != nil {b.err = errbreak}// Reset bufs to the initial position to prevent `append` from generating new memory allocations.bufs = initBufs}
}func (b *Buffer) writeOrWait(p []byte) (int, error) {for {// The buffer queue stops receiving packets and returns directly.if b.isQueueStopped {return 0, b.err}// Write the buffer queue successfully, wake up the sending goroutine.if err := b.queue.Put(p); err == nil {b.wakeUp()return len(p), nil}// The queue is full, send the package directly.if err := b.writeDirectly(); err != nil {return 0, err}}
}
func (b *Buffer) getOrWait(values *[][]byte) error {for {// Check whether to be notified to close the outgoing goroutine.select {case <-b.done:return ErrAskQuitcase err := <-b.errCh:return errdefault:}// Bulk receive packets from the cache queue.size, _ := b.queue.Gets(values)if size > 0 {return nil}// Fast Path: Due to the poor performance of using select// to wake up the goroutine, it is preferred here to use Gosched()// to delay checking the queue, improving the hit rate and// the efficiency of obtaining packets in batches, thereby reducing// the probability of using select to wake up the goroutine.runtime.Gosched()if !b.queue.IsEmpty() {continue}// Slow Path: There are still no packets after the delayed check queue,// indicating that the system is relatively idle. goroutine uses// the select mechanism to wait for wakeup. The advantage of hibernation// is to reduce CPU idling loss when the system is idle.select {case <-b.done:return ErrAskQuitcase err := <-b.errCh:return errcase <-b.wakeupCh:}}
}
- 實現批量發包,只需要一開始對于這塊全局的buffer進行一個start,循環看隊列有沒有數據,有的話全量取出并write
- 寫的時候,調用writeOrWait這個接口,數據進ringBuffer就可以了
測試
server:
client:
總結
目前RPC先做到這了,以后還有什么優化或者有意思的再補充版本吧