文章目錄
- 錯誤處理
- 常規用法
- 進階用法
- 原理
- 多路復用
- 元數據
- 負載均衡
- 壓縮數據
錯誤處理
gRPC 一般不在 message 中定義錯誤。畢竟每個 gRPC 服務本身就帶一個 error 的返回值,這是用來傳輸錯誤的專用通道。gRPC 中所有的錯誤返回都應該是 nil 或者 由 status.Status 產生的一個error。這樣error可以直接被調用方Client識別。
常規用法
當遇到一個go錯誤的時候,直接返回是無法被下游client識別的。
恰當的做法是:
調用 status.New 方法,并傳入一個適當的錯誤碼,生成一個 status.Status 對象
調用該 status.Err 方法生成一個能被調用方識別的error,然后返回
st := status.New(codes.NotFound, “some description”)
err := st.Err()
傳入的錯誤碼是 codes.Code 類型。
此外還有更便捷的辦法:使用 status.Error。它避免了手動轉換的操作。
err := status.Error(codes.NotFound, "some description")
進階用法
上面的錯誤有個問題,就是 code.Code 定義的錯誤碼只有固定的幾種,無法詳盡地表達業務中遇到的錯誤場景。
gRPC 提供了在錯誤中補充信息的機制:status.WithDetails 方法
Client 通過將 error 重新轉換位 status.Status ,就可以通過 status.Details 方法直接獲取其中的內容。
status.Detials 返回的是個slice, 是interface{}的slice,然而go已經自動做了類型轉換,可以通過斷言直接使用。
服務端示例
- 生成一個 status.Status 對象
- 填充錯誤的補充信息
// 生成一個 status.Status
st := status.New(codes.ResourceExhausted, "Request limit exceeded.")
// 填充錯誤的補充信息 WithDetails
ds, err := st.WithDetails(&epb.QuotaFailure{Violations: []*epb.QuotaFailure_Violation{{Subject: fmt.Sprintf("name:%s", in.Name),Description: "Limit one greeting per person",}},},
)
if err != nil {return nil, st.Err()
}
return nil, ds.Err()
客戶端的示例
- 調用RPC錯誤后,解析錯誤信息
- 通過斷言直接獲取錯誤詳情
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: "world"})
// 調用 RPC 如果遇到錯誤就對錯誤處理
if err != nil {// 轉換錯誤s := status.Convert(err)// 解析錯誤信息for _, d := range s.Details() {// 通過斷言直接使用switch info := d.(type) {case *epb.QuotaFailure:log.Printf("Quota failure: %s", info)default:log.Printf("Unexpected type: %s", info)}}
}
原理
這個錯誤是如何傳遞給調用方Client的呢?
是放到 metadata中的,而metadata是放到HTTP的header中的。
metadata是key:value格式的數據。錯誤的傳遞中,key是個固定值:grpc-status-details-bin。
而value,是被proto編碼過的,是二進制安全的。
目前大多數語言都實現了這個機制。
多路復用
同一臺服務器上的多個RPC服務的多路復用,比如同時保存一個訂單的存根、一個歡迎的存根因為多個RPC服務運行在一個服務端上,所以客戶端的多個存根之間是可以共享gRPC連接的
服務端代碼
func main() {lis, err := net.Listen("tcp", port)if err != nil {log.Fatalf("failed to listen: %v", err)}grpcServer := grpc.NewServer() // 注冊進訂單服務ordermgt_pb.RegisterOrderManagementServer(grpcServer, &orderMgtServer{}) // 注冊進歡迎服務hello_pb.RegisterGreeterServer(grpcServer, &helloServer{})
}
客戶端代碼
func main() {conn, err := grpc.Dial(address, grpc.WithInsecure())if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()// 訂單服務建立實例連接orderManagementClient := pb.NewOrderManagementClient(conn)ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()order1 := pb.Order{Id: "101", Items:[]string{"iPhone XS", "Mac Book Pro"}, Destination:"San Jose, CA", Price:2300.00}res, addErr := orderManagementClient.AddOrder(ctx, &order1)// 歡迎服務建立實例連接helloClient := hwpb.NewGreeterClient(conn)hwcCtx, hwcCancel := context.WithTimeout(context.Background(), time.Second)defer hwcCancel()helloResponse, err := helloClient.SayHello(hwcCtx, &hwpb.HelloRequest{Name: "gRPC Up and Running!"})fmt.Println("Greeting: ", helloResponse.Message)
}
元數據
在多個微服務的調用當中,信息交換常常是使用方法之間的參數傳遞的方式,但是在有些場景下,一些信息可能和 RPC 方法的業務參數沒有直接的關聯,所以不能作為參數的一部分,在 gRPC 中,可以使用元數據來存儲這類信息。
元數據創建
// 方法1
md := metadata.Pairs("1", "v1","1", "v2", // 方法1會把相同的鍵的字段合并,[ ]string{"v1","v2"}"2", "v3",)
// 方法2
md := metadata.New(map[string]string{"1":"v1","2":"v2"})
客戶端收發
在context中設置的元數據會轉換成線路層的gRPC頭信息和 trailer
客戶端發送這些頭信息,收件方會以頭信息的形式接收他們
// 創建元數據md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano),"kn", "vn",)// 創建新元數據的上下文,這種方法會替換掉已有的上下文mdCtx := metadata.NewOutgoingContext(context.Background(), md)// 這種方法是將元數據附加到已有的上下文ctxA := metadata.AppendToOutgoingContext(mdCtx, "k1", "v1", "k1", "v2", "k2", "v3")// 定義頭信息和 trailer,可以用來接收元數據var header, trailer metadata.MDorder1 := pb.Order{Id: "101", Items: []string{"iPhone XS", "Mac Book Pro"}, Destination: "San Jose, CA", Price: 2300.00}res, _ := client.AddOrder(ctxA, &order1, grpc.Header(&header), grpc.Trailer(&trailer))log.Print("AddOrder Response -> ", res.Value)// 獲取頭信息head, err := res.Header()// 獲取trailertrail, err := res.Trailer()
服務端收發
// 從上下文中獲取元數據列表
md, metadataAvailable := metadata.FromIncomingContext(ctx)if !metadataAvailable {return nil, status.Errorf(codes.DataLoss, "UnaryEcho: failed to get metadata")}
// 操作元數據邏輯if t, ok := md["timestamp"]; ok {fmt.Printf("timestamp from metadata:\n")for i, e := range t {fmt.Printf("====> Metadata %d. %s\n", i, e)}}// 創建元數據
header := metadata.New(map[string]string{"location": "San Jose", "timestamp": time.Now().Format(time.StampNano)})
// 發送頭信息
grpc.SendHeader(ctx, header)
trailer := metadata.Pairs("status","ok")
// 設置trailer
grpc.SetTrailer(ctx,trailer)
負載均衡
負載均衡器代理
也就是說后端的結構對gRPC客戶端是不透明的,客戶端只需要知道均衡器的斷點就可以了,比如NGINX代理、Envoy代理
客戶端負載均衡
func main(){roundrobinConn, err := grpc.Dial(address,grpc.WithBalancerName("round_robin"), // 指定負載均衡的算法// 默認是"pick_first",也就是從服務器列表中第一個服務端開始嘗試發送請求,成功則后續所有RPC都發往這個服務器// "round_robin"輪詢調度算法,連接所有地址,每次向后端發送一個RPCgrpc.WithInsecure(),)if err != nil {log.Fatalf("did not connect: %v", err)}defer roundrobinConn.Close()// 起10個RPC調度任務makeRPCs(roundrobinConn, 10)
}func makeRPCs(cc *grpc.ClientConn, n int) {hwc := ecpb.NewEchoClient(cc)for i := 0; i < n; i++ {callUnary(hwc)}
}func callUnary(c ecpb.EchoClient) {ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()}
壓縮數據
在服務端會對已注冊的壓縮器自動解碼,響應時自動編碼
始終從客戶端獲取指定的壓縮方法,如果沒被注冊就會返回Unimplemented
func main() {conn, err := grpc.Dial(address, grpc.WithInsecure())defer conn.Close()client := pb.NewOrderManagementClient(conn)ctx, cancel := context.WithTimeout(context.Background(), time.Second * 5)defer cancel()order1 := pb.Order{Id: "101", Items:[]string{"iPhone XS", "Mac Book Pro"}, Destination:"San Jose, CA", Price:2300.00}// 通過 grpc.UseCompressor(gzip.Name) 就可以輕松壓縮數據res, _ := client.AddOrder(ctx, &order1, grpc.UseCompressor(gzip.Name))
}