在前兩個版本中,我們實現了基礎的客戶端-服務端通信、連接池、序列化等關鍵模塊。為了進一步提升吞吐量和并發性能,本版本新增了 異步發送機制 和 多路復用支持,旨在減少資源消耗、提升連接利用率。
代碼地址:https://github.com/karatttt/MyRPC
版本三新增特性
異步發送機制實現
背景:
在同步RPC調用中,客戶端每發送一次請求都需阻塞等待響應,這在網絡抖動或響應較慢時會嚴重降低系統吞吐量。因此,本版本引入了 異步任務模型,支持超時重試、指數退避、完成回調等能力,確保在客戶端請求失敗后可以自動重試、不中斷主邏輯。
實現思路:
- 實際上異步回調的功能很好實現,只需要將回調方法傳入內部,當內部狀態為成功或者完成的時候調用該callback方法即可。
- 而異步超時失敗重試的機制實際上是讓客戶端的發送請求交由另一個協程來做,客戶端可以先執行其他的邏輯再阻塞等待future的結果,或者設置一個回調方法,或者不關心回復。異步發送實際上就是犧牲了可靠性,而重試是為了盡量提高這個可靠性。超時重試這個可以通過在協程內通過計時器重試,如果超時則在同一個協程中再進行一次發送,直到重試到大于最大重試次數。但是這樣會導致等待重試的協程數量太多,對于某一時間段網絡出現抖動的情況,出現了大量的重試,就會導致協程數劇增的情況。
- 借鑒了RocketMQ的異步發送的機制,采用了一個協程統一管理需要重試的任務,并用一個延時隊列來排序處理任務
Client的變動
為了區分同步發送,為異步增加了異步的proxy和異步的send方法
// 創建客戶端代理
func NewHelloAsyncClientProxy(opts ...client.Option) HelloAsyncClientProxy {return &HelloAsyncClientProxyImpl{client: client.DefaultClient,opts: opts,}
}
// 實現HelloAsync方法
func (c *HelloAsyncClientProxyImpl) HelloAsync(ctx context.Context, req *HelloRequest, opts ...client.Option) (*internel.Future, *common.RPCError) {msg := internel.NewMsg()msg.WithServiceName("helloworld")msg.WithMethodName("Hello")ctx = context.WithValue(ctx, internel.ContextMsgKey, msg)rsp := &HelloReply{}// 這里需要將opts添加前面newProxy時傳入的optsnewOpts := append(c.opts, opts...)return c.client.InvokeAsync(ctx, req, rsp, newOpts...)
}
- 這里是rpc.go中新增的代理以及實現方法,還沒有體現異步發送的邏輯,接下來看InvokeAsync
InvokeAsync
func (c *client) InvokeAsync(ctx context.Context, reqBody interface{}, rspBody interface{}, opt ...Option) (*internel.Future, *common.RPCError) {future := internel.NewFuture()opts := DefaultOptionsfor _, o := range opt {o(opts)}go func() {var task *async.Taskif opts.Timeout > 0 {// 有超時時間的情況下,無論是否進行重試,將任務提交給全局管理器ctx, msg := internel.GetMessage(ctx)task = &async.Task{MethodName: msg.GetMethodName(),Request: reqBody,MaxRetries: opts.RetryTimes,Timeout: opts.Timeout,ExecuteFunc: c.makeRetryFunc(ctx, reqBody, rspBody, opts),OnComplete: func(err error) {// 最終結果回調到原始Futureif err != nil {future.SetResult(nil, &common.RPCError{Code: common.ErrCodeRetryFailed,Message: err.Error(),})} else {future.SetResult(rspBody, nil)}},}// 提交任務到全局管理器task.Status = async.TaskStatusPendingfuture.Task = taskasync.GetGlobalTaskManager().AddTask(task)// 執行發送邏輯err := opts.ClientTransport.Send(ctx, reqBody, rspBody, opts.ClientTransportOption)if err == nil {future.SetResult(rspBody, nil)}} else {// 無超時時間的情況下,錯誤的話直接返回err := opts.ClientTransport.Send(ctx, reqBody, rspBody, opts.ClientTransportOption)if err == nil {future.SetResult(rspBody, nil)} else {future.SetResult(nil, &common.RPCError{Code: common.ErrCodeClient,Message: err.Error(),})}}}()return future, nil
}
- 我們先看看Future結構,再去理解上面的代碼:
type Future struct {mu sync.Mutexdone chan struct{}result interface{}err *common.RPCErrorcallbacks []func(interface{}, *common.RPCError)Task *async.Task // 關聯的異步任務}// SetResult 設置Future的結果func (f *Future) SetResult(result interface{}, err *common.RPCError) {f.mu.Lock()defer f.mu.Unlock()if f.isDone() {return}f.result = resultf.Task.Status = async.TaskStatusCompletedf.err = errclose(f.done)// 執行所有注冊的回調for _, callback := range f.callbacks {callback(result, err)}}
- 這個就是異步發送后返回的Future,result就是回包結果,callbacks就是客戶端設置的回調方法,Task是后續添加到全局異步管理器的任務,后續再說
- 而這個SetResult就是在得到結果后設置future的result,并且調用所有注冊的回調方法,并置Task.Status = async.TaskStatusCompleted,這個關于task的我們后面再說
- 接下來回到invokeAsync,對于沒有設置超時時間的發送,我們直接在失敗后返回客戶端(客戶端能夠忍受異步的丟失,如果真的發生了長時間的阻塞,也不用擔心這個協程不釋放,因為我們的連接池會管理這個連接的生命周期),對于設置了超時時間的發送,我們需要在超時時間到達后進行重試,或者達到最大重試次數后進行失敗反饋
- 這里就做了一個全局的管理器,先創建一個Task將其添加到manager中,再進行消息的正常發送。
TaskManager
// Task 表示一個異步任務
type Task struct {MethodName string // 方法名Request interface{} // 請求參數RetryTimes int // 當前已重試次數MaxRetries int // 最大重試次數Timeout time.Duration // 單次任務超時時間NextRetryAt time.Time // 下次重試時間(用于堆排序)ExecuteFunc func() error // 重試時任務執行函數Status TaskStatus // 狀態字段OnComplete func(error) // 最終完成回調mu sync.Mutex // 保證狀態變更的線程安全
}// 掃描循環(核心邏輯)
func (tm *TaskManager) scanLoop() {for {select {case <-tm.closeChan:returndefault:tm.processTasks()}}
}// 處理超時任務
func (tm *TaskManager) processTasks() {tm.mu.Lock()if tm.tasks.Len() == 0 {tm.mu.Unlock()// 無任務時休眠,直到被喚醒select {case <-tm.wakeChan:case <-time.After(10 * time.Second): // 防止長期阻塞}return}// 檢查堆頂任務是否超時now := time.Now()task := (*tm.tasks)[0]if now.Before(task.NextRetryAt) {// 未超時,休眠到最近任務到期tm.mu.Unlock()time.Sleep(task.NextRetryAt.Sub(now))return}// 彈出超時任務task = heap.Pop(tm.tasks).(*Task)tm.mu.Unlock()// 執行重試邏輯go tm.retryTask(task)
}// 重試任務
func (tm *TaskManager) retryTask(task *Task) {task.mu.Lock()// 檢查狀態:如果任務已結束,直接返回,不用再次入隊列if task.Status != TaskStatusPending {task.mu.Unlock()return}task.Status = TaskStatusRunning // 標記為執行中task.mu.Unlock()err := task.ExecuteFunc()if err == nil {task.OnComplete(nil)return}// 檢查是否達到最大重試次數task.RetryTimes++if task.RetryTimes > task.MaxRetries {// 打印fmt.Println("request retry times exceed max retry times")task.OnComplete(err)return}// 計算下次重試時間(如指數退避)delay := time.Duration(math.Pow(2, float64(task.RetryTimes))) * time.Secondtask.NextRetryAt = time.Now().Add(delay) // 重新加入隊列// 打印重試次數fmt.Println("request retry time : ", task.RetryTimes)tm.mu.Lock()heap.Push(tm.tasks, task)task.Status = TaskStatusPending // 恢復狀態tm.mu.Unlock()tm.notifyScanner()
}
- 以上是這個manager的關鍵代碼,這個Task就是里面的元素,按照下一次重試時間排序放在manager的一個延時隊列里面,優先處理目前需要重試的任務。task的ExecuteFunc我們在前面的方法中可以看到實際上就是retry發送,OnComplete就是將future的setResult使得客戶端能得到反饋
- 循環執行processTasks,對于堆頂任務進行retry
- retry時先看這個task是不是已經執行成功了,是的話刪除這個task,如果不是的話繼續入隊
- 這樣就可以保證只有一個協程在管理所有的超時任務,避免了每一個超時任務都需要一個協程來等待重試。
多路復用
背景:
- 默認情況下,每個RPC調用使用一個連接,連接池雖然能緩解資源浪費,對于連接池中的每一個連接,實際上也是串行進行的,也就是說,如果前面的某一個連接處理時間太長,后續的請求只能等待該請求返回后才能復用該連接,也就是http1.1的隊頭阻塞問題。
- 為此,引入 多路復用協議 —— 即在一個TCP連接內支持多個“邏輯流”,每個流由 RequestID 唯一標識,從而支持多個請求同時復用一條連接。
實現思路:
我們之前的frame結構如下:
header := FrameHeader{MagicNumber: MagicNumber,Version: Version,MessageType: MessageTypeRequest,SequenceID: sequenceID, ProtocolLength: uint32(len(protocolDataBytes)),BodyLength: uint32(len(reqData)),}
實際上已經有了SequenceID這個字段,也就是說,我們可以通過這個SequenceID,來區分同一個連接中的不同的流,也就是說,客戶端在同一個連接中,發送了不同的SequenceID的消息,服務端并發處理這些消息,并且保留這個SequenceID返回客戶端,客戶端的多個流識別這個SequenceID并讀取結果
MuxConn(多路復用連接)結構
// 實現net.Conn接口的結構體,保證適配連接池的get和put
// 實際上也是一個連接,只是多了reqID從而可以派生出多個流,區分達到多路復用的目的
type MuxConn struct {conn net.Conn // 原始連接pending map[uint32]*pendingRequest // 每一個reqID(流)對應的等待通道closeChan chan struct{}readerDone chan struct{}writeLock sync.MutexreqIDCounter uint64 // 分配遞增的請求IDmu sync.RWMutex
}
type pendingRequest struct {ch chan MuxFrametimeout time.Time
}
func (mc *MuxConn) NextRequestID() uint64 {return atomic.AddUint64(&mc.reqIDCounter, 1)
}
- 實際上這個MuxConn實現了net.Conn,也是一個連接,只是可以通過NextRequestID派生出多個流,并在這個conn上write特定reqID的請求
- 可以看到pending這個結構,是一個map,k是reqID,v是一個ch,為什么要設計一個這樣的map?因為我們可能同時存在多路并發,不同的客戶端的對于同一個conn的請求,我們需要設計一個特有的ch來讀取對應的reqID的響應是否到達,如果某一個reqID的響應到達了,發送到對應的ch,從而對應的客戶端得到響應。如果多個流直接并發讀取tcp的響應,必然會導致reqID亂序現象
connPool的改動
之前的連接池只是正常獲取一個連接,當該連接處理完被歸還后才置為空閑狀態。而對于多路復用顯然不是這個規則,對于正在使用的連接,若沒有達到最大可以接受的流的量,我們仍然可以接受從池中返回這個連接并使用
對于之前的獲取連接的邏輯,我們一次對于多路復用加入以下分支:
// 1. 優先檢查空閑連接if len(p.idleConns) > 0 {// 原邏輯。。。// 多路復用處理if p.isMux {if muxConn, exists := p.muxConns[conn]; exists {if p.streamCount[conn] < p.maxStreams {p.streamCount[conn]++MuxConn2SequenceIDMap[muxConn] = muxConn.NextRequestID()return muxConn, nil}}// 如果不是多路復用連接或已達最大流數,回退到普通連接}p.mu.Unlock()return &pooledConnWrapper{conn, p}, nil}// 2. 檢查是否可以創建新連接if int(atomic.LoadInt32(&p.activeCount)) < p.maxActive {// 原邏輯。。。// 多路復用連接初始化if p.isMux {if p.muxConns == nil {p.muxConns = make(map[*PooledConn]*mutilpath.MuxConn)p.streamCount = make(map[*PooledConn]int)}muxConn := mutilpath.NewMuxConn(rawConn, 1000)p.muxConns[pooledConn] = muxConnp.streamCount[pooledConn] = 1 // 新連接默認1個流MuxConn2SequenceIDMap[muxConn] = muxConn.NextRequestID()return muxConn, nil}p.mu.Unlock()return &pooledConnWrapper{pooledConn, p}, nil}// 3. 新增情況:無空閑且活躍連接達到最大數,檢查活躍連接的多路復用能力(僅在多路復用模式下)if p.isMux {for pc, muxConn := range p.muxConns {count := p.streamCount[pc]if count < p.maxStreams {p.streamCount[pc]++atomic.AddInt32(&p.activeCount, 1)pc.lastUsed = time.Now()MuxConn2SequenceIDMap[muxConn] = muxConn.NextRequestID()return p.muxConns[pc], nil}}}
- 對于情況一,若是空閑連接當然直接使用,并增加流數量,并對該連接分配reqID,在MuxConn2SequenceIDMap結構中保存
- 對于情況二,無空閑連接,但是活躍連接數未滿,創建新連接,增加流數量,并對該連接分配reqID,在MuxConn2SequenceIDMap結構中保存
- 對于情況三,無空閑連接且活躍連接數已經滿,檢查所有的活躍連接的流數量是否未滿,并且返回未滿的連接,分配新的流
- 對于Put邏輯,對應的應是歸還流,當某個連接的流為0時,該連接為空閑狀態,不再闡述
Send方法改動
之前的方法只需要send中正常序列化和編解碼就可以,客戶端發送完請求就阻塞(或者異步)等待響應,這里的多路復用模式則是在write前注冊一個pendingRequest,監聽特定的channel
// mux模式下,通過ch阻塞等待相應的流回包muxConn, _ := conn.(*mutilpath.MuxConn)seqID := msg.GetSequenceID()ch := muxConn.RegisterPending(seqID)defer muxConn.UnregisterPending(seqID)// 寫數據err = c.tcpWriteFrame(ctx, conn, framedata)if err != nil {return &common.RPCError{Code: common.ErrCodeNetwork,Message: fmt.Sprintf("failed to write frame: %v", err),}}// 讀響應select {case frame := <-ch:rspDataBuf = frame.Datacase <-ctx.Done():return &common.RPCError{Code: common.ErrCodeNetwork,Message: fmt.Sprintf("failed to read frame: %v", err),}}
- 而客戶端收到響應,路由到對應reqID的channel的邏輯在這里:
func (mc *MuxConn) readLoop() {defer close(mc.readerDone)for {select {case <-mc.closeChan:returndefault:}frame, err := codec.ReadFrame(mc.conn)if err != nil {// 協議錯誤處理fmt.Println("讀取幀錯誤:", err)break}mc.dispatchFrame(frame)}
}func (mc *MuxConn) dispatchFrame(frame []byte) {mc.mu.RLock()// 截取流序號sequenceID := binary.BigEndian.Uint32(frame[4:8])pr, exists := mc.pending[uint32(sequenceID)]mc.mu.RUnlock()frameStruct := MuxFrame{Data: frame,}if exists {select {case pr.ch <- frameStruct:// 成功發送到等待通道default:// 通道已滿,丟棄幀fmt.Println("丟棄幀 %s:通道已滿", frame)}} else {// 直接丟棄或打印日志fmt.Printf("收到未匹配的幀,sequenceID=%d,丟棄\n", sequenceID)}
}
總結
在已有基礎通信、連接池與序列化機制之上,通過引入異步發送機制與多路復用技術進一步提升RPC系統的吞吐量與并發性能,使得系統更加健壯。多路復用實際上也是http2.0實現的能力,這里相當于完成了http2.0的任務。以后的版本可以考慮對于性能再進行優化,如網絡框架的改進以及更高效的數據結構的使用