《Go語言高級編程》玩轉RPC
一、客戶端 RPC 實現原理:異步調用機制
Go 的 RPC 客戶端支持同步和異步調用,核心在于 Client.Go
方法的實現:
1. 同步調用(Client.Call
)的本質
func (client *Client) Call(serviceMethod string, args, reply interface{}) error {// 通過 Client.Go 發起異步調用,阻塞等待結果call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Donereturn call.Error
}
同步調用本質是封裝了異步流程:創建調用請求后,通過通道阻塞等待結果返回。
2. 異步調用(Client.Go
)的流程
func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call {call := &Call{ServiceMethod: serviceMethod,Args: args,Reply: reply,Done: make(chan *Call, 10), // 帶緩沖通道,避免阻塞}client.send(call) // 線程安全地發送調用請求return call
}
異步調用返回 Call
對象,調用完成后通過 call.Done
通道通知結果:
func (call *Call) done() {select {case call.Done <- call: // 結果寫入通道default: // 通道滿時不阻塞(由調用方保證緩沖區足夠)}
}
3. 異步調用示例
func doClientWork(client *rpc.Client) {// 發起異步調用,不阻塞當前 goroutinecall := client.Go("HelloService.Hello", "hello", new(string), nil)// 執行其他任務...// 等待調用結果call = <-call.Doneif call.Error != nil {log.Fatal(call.Error)}fmt.Println("參數:", call.Args.(string), "響應:", *call.Reply.(*string))
}
核心優勢:異步調用允許客戶端在等待 RPC 結果時處理其他任務,提升并發性能。
二、基于 RPC 實現 Watch 監控功能
通過 RPC 實現實時監控(類似訂閱-發布模式),以 KV 存儲為例:
1. 服務端設計(KVStoreService
)
type KVStoreService struct {m map[string]string // KV 數據存儲filter map[string]func(string) // 監控過濾器列表mu sync.Mutex // 互斥鎖保護共享資源
}// 獲取 KV 值
func (p *KVStoreService) Get(key string, value *string) error {p.mu.Lock(); defer p.mu.Unlock()if v, ok := p.m[key]; ok {*value = vreturn nil}return errors.New("not found")
}// 設置 KV 值,并觸發監控回調
func (p *KVStoreService) Set(kv [2]string, reply *struct{}) error {p.mu.Lock(); defer p.mu.Unlock()key, value := kv[0], kv[1]if oldVal := p.m[key]; oldVal != value {for _, fn := range p.filter {fn(key) // 調用所有監控過濾器}}p.m[key] = valuereturn nil
}// 監控方法:注冊過濾器,等待 key 變化或超時
func (p *KVStoreService) Watch(timeout int, keyChanged *string) error {id := "watch-" + time.Now().Format("150405") + "-" + strconv.Itoa(rand.Intn(1000))ch := make(chan string, 10)p.mu.Lock()p.filter[id] = func(key string) { ch <- key } // 注冊過濾器p.mu.Unlock()select {case <-time.After(time.Duration(timeout) * time.Second):return errors.New("timeout")case key := <-ch:*keyChanged = keyreturn nil}
}
2. 客戶端調用
func doClientWork(client *rpc.Client) {// 啟動獨立 goroutine 執行監控,阻塞等待 key 變化go func() {var key stringif err := client.Call("KVStoreService.Watch", 30, &key); err != nil {log.Fatal(err)}fmt.Println("監控到變化的 key:", key)}()// 修改 KV 值,觸發監控回調if err := client.Call("KVStoreService.Set", [2]string{"abc", "new-value"}, new(struct{})); err != nil {log.Fatal(err)}time.Sleep(3 * time.Second)
}
核心原理:
- 服務端為每個
Watch
調用生成唯一 ID,綁定過濾器函數到filter
列表。 - 當
Set
方法修改數據時,遍歷調用所有過濾器,通過通道通知監控方。 - 客戶端通過異步 goroutine 阻塞監聽,實現實時監控。
三、反向 RPC:內網服務主動連接外網
傳統 RPC 是客戶端連接服務端,反向 RPC 則相反,適用于內網服務無法被外網直接訪問的場景:
1. 內網服務端(主動連接外網)
func main() {rpc.Register(new(HelloService)) // 注冊服務for {// 主動連接外網服務器conn, err := net.Dial("tcp", "外網IP:1234")if err != nil {time.Sleep(1 * time.Second)continue}// 基于連接提供 RPC 服務rpc.ServeConn(conn)conn.Close()}
}
2. 外網客戶端(監聽連接)
func main() {listener, err := net.Listen("tcp", ":1234")if err != nil {log.Fatal(err)}clientChan := make(chan *rpc.Client)// 后臺 goroutine 接受連接并創建客戶端go func() {for {conn, err := listener.Accept()if err != nil {log.Fatal(err)}clientChan <- rpc.NewClient(conn) // 將客戶端放入通道}}()doClientWork(clientChan) // 從通道獲取客戶端并調用
}func doClientWork(clientChan <-chan *rpc.Client) {client := <-clientChandefer client.Close()var reply stringif err := client.Call("HelloService.Hello", "hello", &reply); err != nil {log.Fatal(err)}fmt.Println(reply)
}
核心邏輯:
- 內網服務主動撥號外網服務器,建立連接后提供 RPC 服務。
- 外網客戶端監聽端口,接收連接并轉換為 RPC 客戶端,通過通道傳遞給業務邏輯。
- 適用于內網服務需被外網訪問,但內網無法暴露端口的場景(如防火墻限制)。
四、上下文信息:基于連接的定制化服務
為每個 RPC 連接添加上下文(如認證狀態、客戶端信息),提升服務安全性和靈活性:
1. 服務端改造(包含連接和狀態)
type HelloService struct {conn net.Conn // 連接對象,可獲取客戶端地址等信息isLogin bool // 登錄狀態
}// 登錄方法
func (p *HelloService) Login(request string, reply *string) error {if request != "user:password" {return errors.New("認證失敗")}log.Println("登錄成功")p.isLogin = true*reply = "登錄成功"return nil
}// 需要認證的 Hello 方法
func (p *HelloService) Hello(request string, reply *string) error {if !p.isLogin {return errors.New("請先登錄")}*reply = "hello:" + request + ", from " + p.conn.RemoteAddr().String()return nil
}
2. 服務端啟動邏輯(為每個連接創建獨立服務)
func main() {listener, err := net.Listen("tcp", ":1234")if err != nil {log.Fatal(err)}for {conn, err := listener.Accept()if err != nil {log.Fatal(err)}// 為每個連接啟動獨立 goroutine,綁定 HelloService 實例go func(c net.Conn) {defer c.Close()server := rpc.NewServer()server.Register(&HelloService{conn: c}) // 傳入連接對象server.ServeConn(c)}(conn)}
}
3. 客戶端調用流程
func main() {client, err := rpc.Dial("tcp", "localhost:1234")if err != nil {log.Fatal(err)}// 先登錄var loginReply stringif err := client.Call("HelloService.Login", "user:password", &loginReply); err != nil {log.Fatal("登錄失敗:", err)}// 再調用 Hello 方法var helloReply stringif err := client.Call("HelloService.Hello", "world", &helloReply); err != nil {log.Fatal("調用失敗:", err)}fmt.Println(helloReply) // 輸出包含客戶端地址的響應
}
核心優勢:
- 通過
net.Conn
獲取客戶端上下文(如 IP 地址、連接狀態)。 - 基于連接狀態實現認證邏輯(如登錄驗證),確保服務安全性。
- 每個連接獨立維護狀態,避免多客戶端數據混淆。
五、關鍵概念總結
- 異步調用:通過通道機制實現非阻塞 RPC 調用,提升客戶端并發能力。
- Watch 機制:利用函數回調和通道,實現服務端數據變化的實時通知。
- 反向 RPC:打破傳統 C/S 模式,適用于內網服務主動對外提供能力的場景。
- 上下文管理:基于連接綁定狀態(如認證信息),實現定制化服務邏輯。