這里我們來實現這個RPC的client端
為了實現RPC的效果,我們調用的Hello方法,即server端的方法,應該是由代理來調用,讓proxy里面封裝網絡請求,消息的發送和接受處理。而上一篇文章提到的服務端的代理已經在.rpc.go文件中實現,我們將客戶端的實現也寫在這里
ClientProxy
// 客戶端代理接口
type HelloClientProxy interface {Hello(ctx context.Context, in *HelloRequest, opts ...client.Option) (*HelloReply, error)
}// 客戶端代理實現
type HelloClientProxyImpl struct {client client.Clientopts []client.Option
}// 創建客戶端代理
func NewHelloClientProxy(opts ...client.Option) HelloClientProxy {return &HelloClientProxyImpl{client: client.DefaultClient,opts: opts,}
}
- 這里的HelloClientProxyImpl其中的client類主要是負責invoke方法,抽象網絡IO和編解碼,opts主要是記錄客戶端啟動時傳入的配置項,如server的ip地址等
- 創建出客戶端代理,我們就可以通過代理來調用Hello方法
// 實現Hello方法
func (c *HelloClientProxyImpl) Hello(ctx context.Context, req *HelloRequest, opts ...client.Option) (*HelloReply, error) {// 創建一個msg結構,存儲service相關的數據,如serviceName等,并放到context中// 用msg結構可以避免在context中太多withValue傳遞過多的參數msg := internel.NewMsg()msg.WithServiceName("helloworld")msg.WithMethodName("Hello")ctx = context.WithValue(ctx, internel.ContextMsgKey, msg)rsp := &HelloReply{}// 這里需要將opts添加前面newProxy時傳入的optsnewOpts := append(c.opts, opts...)err := c.client.Invoke(ctx, req, rsp, newOpts...)if err != nil {return nil, err}return rsp, nil
}
- 這里需要明確service的名字和對應方法,為了后續封裝在協議數據里,到達server端才能正確路由。當代理類實現了這個Hello后,我們就可以通過proxy.Hello得到相應結果,Invoke方法隱藏了具體的網絡處理,我們跟進Invoke方法
Client(clientTransPort)
上文提到,client類主要處理invoke方法,我們可以預見它的職責就是,
- 序列化請求體
- 編碼
- 發送請求,接受響應
- 解碼
- 反序列化響應體
- 返回客戶端
為了代碼的解耦,我們和server的處理一樣,將以上操作放到clientTransPort上,client持有transPort,讓transPort處理以上的邏輯
// 實現Send方法
func (c *clientTransport) Send(ctx context.Context, reqBody interface{}, rspBody interface{}, opt *ClientTransportOption) error {// 獲取連接// TODO 這里的連接后續可以優化從連接池獲取conn, err := net.Dial("tcp", opt.Address)if err != nil {return err}defer conn.Close()// reqbody序列化reqData, err := codec.Marshal(reqBody)if err != nil {return err}// reqbody編碼,返回請求幀framedata, err := opt.Codec.Encode(ctx, reqData)if err != nil {return err}// 寫數據到連接中err = c.tcpWriteFrame(ctx, conn, framedata)if err != nil {return err}// 讀取tcp幀rspDataBuf, err := c.tcpReadFrame(ctx, conn)if err != nil {return err}// 獲取msgctx, msg := internel.GetMessage(ctx)// rspDataBuf解碼,提取響應體數據rspData, err := opt.Codec.Decode(msg, rspDataBuf)if err != nil {return err}// 將rspData反序列化為rspBodyerr = codec.Unmarshal(rspData, rspBody)if err != nil {return err}return nil
}
-
序列化是根據protobuf協議,編碼的格式我們之間寫Server的時候提到,我們需要將數據編碼成以下格式:
-
當編碼完成后,我們就需要寫數據到連接中,并監聽該連接的數據,當有數據后,我們再依次解碼得到響應體,再將響應體反序列化,返回客戶端。
-
寫數據到連接和讀連接中的數據也很簡單,這里我們直接開啟一個連接,調用Write寫,而codec.ReadFrame在server端的時候已經介紹過
func (c *clientTransport) tcpWriteFrame(ctx context.Context, conn net.Conn, frame []byte) error {// 寫入tcp_, err := conn.Write(frame)if err != nil {return fmt.Errorf("write frame error: %v", err)}return nil
}func (c *clientTransport) tcpReadFrame(ctx context.Context, conn net.Conn) ([]byte, error) {return codec.ReadFrame(conn)
}
效果測試
至此client端處理完畢,我們來看看效果:
//client端的測試main.go:
func main() {c := pb.NewHelloClientProxy(client.WithTarget("127.0.0.1:8000"))if c == nil {fmt.Println("Failed to create client")return}rsp, err := c.Hello(context.Background(), &pb.HelloRequest{Msg: "world"})if err != nil {fmt.Println("RPC call error:", err)return}fmt.Println("Response:", rsp.Msg)
}// server端的測試的main.go
func main() {// Create a new server instances := server.NewServer()// Register the HelloService with the serverpb.RegisterHelloServer(s, &HelloServerImpl{})// Start the server on port 50051if err := s.Serve(":8000"); err != nil {panic(err)}fmt.Print("啟動成功")
}// 創建一個HelloServer的實現類
type HelloServerImpl struct{}
// 實現HelloServer接口的Hello方法
func (h *HelloServerImpl) Hello(req *pb.HelloRequest) (*pb.HelloReply, error) {// 這里可以實現具體的業務邏輯reply := &pb.HelloReply{Msg: "Hello " + req.Msg,}return reply, nil
}
server端啟動:
server端接收到client的連接請求:
client收到響應:
現在version1開發完了,目前的版本主要是實現基礎功能,并且為了考慮后續的擴展性做了比較多的解耦,在后面的版本,我們可以逐漸提升這個rpc的性能和融入更多的功能