云原生學習路線導航頁(持續更新中)
- kubernetes學習系列快捷鏈接
- Kubernetes架構原則和對象設計(一)
- Kubernetes架構原則和對象設計(二)
- Kubernetes架構原則和對象設計(三)
- Kubernetes控制平面組件:etcd(一)
- Kubernetes控制平面組件:etcd(二)
- Kubernetes控制平面組件:API Server詳解(一)
- Kubernetes控制平面組件:API Server詳解(二)
- Kubernetes控制平面組件:調度器Scheduler(一)
- Kubernetes控制平面組件:調度器Scheduler(二)
- Kubernetes控制平面組件:Controller Manager詳解
- Kubernetes控制平面組件:Controller Manager 之 內置Controller詳解
- Kubernetes控制平面組件:Controller Manager 之 NamespaceController 全方位講解
- Kubernetes控制平面組件:Kubelet詳解(一):API接口層介紹
- Kubernetes控制平面組件:Kubelet詳解(二):核心功能層
- Kubernetes控制平面組件:Kubelet詳解(三):CRI 容器運行時接口層
本文是 kubernetes 的控制面組件 kubelet 系列文章第四篇,主要講解了gRPC的基本概念,工作流程、關鍵特性等,并對protobuf的安裝方法、使用方法 等做了介紹,隨后給出了grpc服務端和客戶端編寫的案例,最后看了下cri-api項目如何使用grpc協議定義了cri接口
- 希望大家多多 點贊 關注 評論 收藏,作者會更有動力繼續編寫技術文章
1.為什么學習grpc
- 在上一節 Kubernetes控制平面組件:Kubelet詳解(三):CRI 容器運行時接口層 中我們提到了 CRI是基于grpc的容器運行時接口標準,kubelet與實際運行時通過grpc交互,使用protobuf協議通信。那么grpc究竟是什么東西?
- 本節將學習下grpc的相關內容
2.RPC 與 gRPC 簡介
2.1.應用架構的變化
2.1.1.單體架構
- 一旦某個服務宕機,會引起整個應用不可用,隔離性差
- 只能整體應用進行伸縮,浪費資源,可伸縮性差
- 代碼耦合在一起,可維護性差
2.1.2.微服務架構
- 解決了單體架構的弊端,但同時引入了新問題:
- 代碼冗余
- 服務和服務之間存在調用關系
- 微服務調用細節:
- 服務拆分后,調用變為進程間、服務器間的通信。
- 需發起網絡調用(如 HTTP),但 HTTP 性能較低。
- 解決方案:引入 RPC(遠程過程調用),通過自定義 TCP 協議提升傳輸效率。
2.2.RPC簡介
2.2.1.RPC是什么
- RPC(Remote Procedure Call,遠程過程調用)
- 全稱:Remote Procedure Call
- 定義:一種用于屏蔽分布式計算中各種調用細節的協議,使開發者能夠像調用本地函數一樣直接調用遠程函數。
2.2.2.RPC 客戶端與服務端通信過程
- 客戶端發送數據(以字節流形式傳輸)
- 服務端接收并解析數據,根據預定義約定執行對應操作
- 服務端將執行結果返回給客戶端
2.2.1.RPC 的核心作用
- 封裝優化:RPC就是將上述通信過程封裝,簡化操作流程
- 協議標準化:采用公認協議實現規范化通信
- 價值創造:通過框架工具直接或間接產生經濟效益
2.3.gRPC簡介
2.3.1.gRPC簡介
- gRPC官方定義
- 英文原文:A high-performance, open-source universal RPC framework
- 中文釋義:gRPC是一個高性能、開源的通用RPC框架。
- gprc官網:https://grpc.io/
- grpc中文文檔:https://doc.oschina.net/grpc
- 簡單理解,rpc是一種通信規范,grpc是對rpc協議的落地實現
- grpc 的g表示谷歌,并非go,grpc是支持多語言的
- grpc 的g表示谷歌,并非go,grpc是支持多語言的
2.3.2.gRPC核心概念
- 角色定義
- Client:調用方,客戶端
- Server:被調用方,服務端
- 服務定義思想
- 通過語言無關的方式描述服務,包括:
- 服務名稱
- 可調用方法
- 方法的入參與回參格式
- 通過語言無關的方式描述服務,包括:
2.3.3.gRPC工作流程
- client端:直接調用預定義的方法即可獲得預期的結果,gRPC自動處理底層通信細節
- server端:只需實現定義的方法邏輯,gRPC自動處理底層通信細節
2.3.4.gRPC關鍵特性
- 接口約定模式
- 類似定義接口,Server實現接口,Client調用server實現的接口代理對象
- 其他的內容如通信、序列化等底層細節都交給gRPC
- 語言無關性
- 支持跨語言調用(如C++服務端 + Golang/Java客戶端)
- 服務定義與編解碼過程,均與語言無關
2.3.5.gRPC安裝
go get google.golang.org/grpc
3.Protocol Buffers 詳解
3.1.Protocol Buffers 簡介
3.1.1.Protocol Buffers 是什么
- Protocol Buffers 是谷歌開源的一種數據格式,通常稱為 protobuf,適合高性能,對響應速度有要求的數據傳輸場景。
- gRPC使用了 Protocol Buffers(protobuf) 進行數據的序列化、反序列化
- profobuf 是二進制數據格式,需要編碼和解碼。
- profobuf 數據本身不具有可讀性,只能反序列化之后得到真正可讀的數據。
- 怎么理解 protobuf ?
- 可以把他當成一個 代碼生成工具以及序列化工具
- 這個工具可以把定義的方法,轉換成特定語言的代碼。比如你定義了一種類型的參數,他會幫你轉換成Golang中的struct結構體,你定義的方法,他會幫你轉換成func函數。
- 此外,在發送請求和接受響應的時候,這個工具還會完成對應的 編碼和解碼 工作,將你即將發送的數據編碼成gRPC能夠傳輸的形式,又或者 解碼 接收到的數據格式 為 具體語言的某種數據格式
- 什么是序列化/反序列化
- 序列化:將數據結構或對象轉換成二進制串的過程
- 反序列化:將在序列化過程中所產生的二進制串轉換成數據結構或者對象的過程
3.1.2.protobuf 的優勢
- 序列化后體積相比Json和XML 很小,適合網絡傳輸
- 支持跨平臺多語言
- 消息格式升級和兼容性還不錯
- 序列化反序列化速度很快
3.2.protobuf 工具安裝
3.2.1.protoc編譯器安裝
- windows
- 下載預編譯的二進制文件 protoc-*.zip:https://github.com/protocolbuffers/protobuf/releases
- 解壓到目錄(如 C:\protobuf)。
- 將 bin 目錄添加到系統環境變量 PATH 中。
- mac:
# 使用 Homebrew 安裝 brew install protobuf
- 驗證安裝
protoc --version # 輸出類似 libprotoc 29.3
3.2.2.go語言代碼生成器安裝
- 我們接下來代碼示例都使用go語言,所以這里安裝一個protoc-gen-go,用于將proto文件自動生成go文件
- 如果需要生成其他語言,需要安裝其他的語言生成器
# 安裝protoc-gen-go
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest# 確認插件安裝成功(確保 $GOPATH/bin 在 PATH 中)
which protoc-gen-go
3.3.proto文件編寫
- 下面示例包含 基本語法、常用類型、rpc方法
- 文件名:
person.proto
- proto文件的每個字段,都會指定一個唯一數字標識,決定生成的代碼中,字段的位置順序
syntax = "proto3"; // 使用 proto3 語法package example; // 包名(用于代碼生成時的命名空間)option go_package = "github.com/yourusername/protobuf-example/person"; // Go 代碼的導入路徑// 定義枚舉
enum Gender {UNKNOWN = 0;MALE = 1;FEMALE = 2;
}// 嵌套消息示例
message Address {string city = 1;string street = 2;int32 zip_code = 3;
}// 主消息
message Person {string name = 1; // 字符串類型int32 age = 2; // 整數類型Gender gender = 3; // 枚舉類型repeated string hobbies = 4; // 數組類型(重復字段)Address address = 5; // 嵌套消息
}// 可選:RPC 服務定義(如果需要)
service PersonService {rpc GetPersonInfo (PersonRequest) returns (PersonResponse);
}message PersonRequest {int32 person_id = 1;
}message PersonResponse {Person person = 1;
}
3.4.protoc-gen-go 生成 go 代碼
3.4.1.安裝 Go 插件
# 安裝 protoc-gen-go 插件(生成代碼工具)
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest# 確認插件安裝成功(確保 $GOPATH/bin 在 PATH 中)
which protoc-gen-go
3.4.2.生成 go 結構文件 pb.go
# 創建生成目錄
mkdir -p person# 運行 protoc 命令,生成 go 結構文件
protoc \--go_out=./person \ # 輸出目錄--go_opt=paths=source_relative \ # 保持相對路徑person.proto# 運行 protoc 命令,生成 go-grpc 文件
protoc \--go-grpc_out=./person \--go-grpc_opt=paths=source_relative \person.proto# 如果報錯有下面報錯,說明protoc-gen-go沒有加入環境變量,處理一下就好
# protoc-gen-go: program not found or is not executable
# Please specify a program using absolute path or make sure the program is available in your PATH system variable
# --go_out: protoc-gen-go: Plugin failed with status code 1.
export PATH=$PATH:$HOME/go/bin
- 生成結果:
- 文件:
person/person.pb.go
- 內容包含:
Person
、Address
、Gender
等結構體和序列化方法。
- 文件:
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v5.29.3
// source: person.protopackage personimport (protoreflect "google.golang.org/protobuf/reflect/protoreflect"protoimpl "google.golang.org/protobuf/runtime/protoimpl"reflect "reflect"sync "sync"unsafe "unsafe"
)const (// Verify that this generated code is sufficiently up-to-date._ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)// Verify that runtime/protoimpl is sufficiently up-to-date._ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)// 定義枚舉
type Gender int32const (Gender_UNKNOWN Gender = 0Gender_MALE Gender = 1Gender_FEMALE Gender = 2
)// Enum value maps for Gender.
var (Gender_name = map[int32]string{0: "UNKNOWN",1: "MALE",2: "FEMALE",}Gender_value = map[string]int32{"UNKNOWN": 0,"MALE": 1,"FEMALE": 2,}
)func (x Gender) Enum() *Gender {p := new(Gender)*p = xreturn p
}func (x Gender) String() string {return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}func (Gender) Descriptor() protoreflect.EnumDescriptor {return file_person_proto_enumTypes[0].Descriptor()
}func (Gender) Type() protoreflect.EnumType {return &file_person_proto_enumTypes[0]
}func (x Gender) Number() protoreflect.EnumNumber {return protoreflect.EnumNumber(x)
}// Deprecated: Use Gender.Descriptor instead.
func (Gender) EnumDescriptor() ([]byte, []int) {return file_person_proto_rawDescGZIP(), []int{0}
}// 嵌套消息示例
type Address struct {state protoimpl.MessageState `protogen:"open.v1"`City string `protobuf:"bytes,1,opt,name=city,proto3" json:"city,omitempty"`Street string `protobuf:"bytes,2,opt,name=street,proto3" json:"street,omitempty"`ZipCode int32 `protobuf:"varint,3,opt,name=zip_code,json=zipCode,proto3" json:"zip_code,omitempty"`unknownFields protoimpl.UnknownFieldssizeCache protoimpl.SizeCache
}func (x *Address) Reset() {*x = Address{}mi := &file_person_proto_msgTypes[0]ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))ms.StoreMessageInfo(mi)
}func (x *Address) String() string {return protoimpl.X.MessageStringOf(x)
}func (*Address) ProtoMessage() {}func (x *Address) ProtoReflect() protoreflect.Message {mi := &file_person_proto_msgTypes[0]if x != nil {ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))if ms.LoadMessageInfo() == nil {ms.StoreMessageInfo(mi)}return ms}return mi.MessageOf(x)
}// Deprecated: Use Address.ProtoReflect.Descriptor instead.
func (*Address) Descriptor() ([]byte, []int) {return file_person_proto_rawDescGZIP(), []int{0}
}func (x *Address) GetCity() string {if x != nil {return x.City}return ""
}func (x *Address) GetStreet() string {if x != nil {return x.Street}return ""
}func (x *Address) GetZipCode() int32 {if x != nil {return x.ZipCode}return 0
}// 主消息
type Person struct {state protoimpl.MessageState `protogen:"open.v1"`Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` // 字符串類型Age int32 `protobuf:"varint,2,opt,name=age,proto3" json:"age,omitempty"` // 整數類型Gender Gender `protobuf:"varint,3,opt,name=gender,proto3,enum=example.Gender" json:"gender,omitempty"` // 枚舉類型Hobbies []string `protobuf:"bytes,4,rep,name=hobbies,proto3" json:"hobbies,omitempty"` // 數組類型(重復字段)Address *Address `protobuf:"bytes,5,opt,name=address,proto3" json:"address,omitempty"` // 嵌套消息unknownFields protoimpl.UnknownFieldssizeCache protoimpl.SizeCache
}func (x *Person) Reset() {*x = Person{}mi := &file_person_proto_msgTypes[1]ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))ms.StoreMessageInfo(mi)
}func (x *Person) String() string {return protoimpl.X.MessageStringOf(x)
}func (*Person) ProtoMessage() {}func (x *Person) ProtoReflect() protoreflect.Message {mi := &file_person_proto_msgTypes[1]if x != nil {ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))if ms.LoadMessageInfo() == nil {ms.StoreMessageInfo(mi)}return ms}return mi.MessageOf(x)
}// Deprecated: Use Person.ProtoReflect.Descriptor instead.
func (*Person) Descriptor() ([]byte, []int) {return file_person_proto_rawDescGZIP(), []int{1}
}func (x *Person) GetName() string {if x != nil {return x.Name}return ""
}func (x *Person) GetAge() int32 {if x != nil {return x.Age}return 0
}func (x *Person) GetGender() Gender {if x != nil {return x.Gender}return Gender_UNKNOWN
}func (x *Person) GetHobbies() []string {if x != nil {return x.Hobbies}return nil
}func (x *Person) GetAddress() *Address {if x != nil {return x.Address}return nil
}type PersonRequest struct {state protoimpl.MessageState `protogen:"open.v1"`PersonId int32 `protobuf:"varint,1,opt,name=person_id,json=personId,proto3" json:"person_id,omitempty"`unknownFields protoimpl.UnknownFieldssizeCache protoimpl.SizeCache
}func (x *PersonRequest) Reset() {*x = PersonRequest{}mi := &file_person_proto_msgTypes[2]ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))ms.StoreMessageInfo(mi)
}func (x *PersonRequest) String() string {return protoimpl.X.MessageStringOf(x)
}func (*PersonRequest) ProtoMessage() {}func (x *PersonRequest) ProtoReflect() protoreflect.Message {mi := &file_person_proto_msgTypes[2]if x != nil {ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))if ms.LoadMessageInfo() == nil {ms.StoreMessageInfo(mi)}return ms}return mi.MessageOf(x)
}// Deprecated: Use PersonRequest.ProtoReflect.Descriptor instead.
func (*PersonRequest) Descriptor() ([]byte, []int) {return file_person_proto_rawDescGZIP(), []int{2}
}func (x *PersonRequest) GetPersonId() int32 {if x != nil {return x.PersonId}return 0
}type PersonResponse struct {state protoimpl.MessageState `protogen:"open.v1"`Person *Person `protobuf:"bytes,1,opt,name=person,proto3" json:"person,omitempty"`unknownFields protoimpl.UnknownFieldssizeCache protoimpl.SizeCache
}func (x *PersonResponse) Reset() {*x = PersonResponse{}mi := &file_person_proto_msgTypes[3]ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))ms.StoreMessageInfo(mi)
}func (x *PersonResponse) String() string {return protoimpl.X.MessageStringOf(x)
}func (*PersonResponse) ProtoMessage() {}func (x *PersonResponse) ProtoReflect() protoreflect.Message {mi := &file_person_proto_msgTypes[3]if x != nil {ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))if ms.LoadMessageInfo() == nil {ms.StoreMessageInfo(mi)}return ms}return mi.MessageOf(x)
}// Deprecated: Use PersonResponse.ProtoReflect.Descriptor instead.
func (*PersonResponse) Descriptor() ([]byte, []int) {return file_person_proto_rawDescGZIP(), []int{3}
}func (x *PersonResponse) GetPerson() *Person {if x != nil {return x.Person}return nil
}var File_person_proto protoreflect.FileDescriptorconst file_person_proto_rawDesc = "" +"\n" +"\fperson.proto\x12\aexample\"P\n" +"\aAddress\x12\x12\n" +"\x04city\x18\x01 \x01(\tR\x04city\x12\x16\n" +"\x06street\x18\x02 \x01(\tR\x06street\x12\x19\n" +"\bzip_code\x18\x03 \x01(\x05R\azipCode\"\x9d\x01\n" +"\x06Person\x12\x12\n" +"\x04name\x18\x01 \x01(\tR\x04name\x12\x10\n" +"\x03age\x18\x02 \x01(\x05R\x03age\x12'\n" +"\x06gender\x18\x03 \x01(\x0e2\x0f.example.GenderR\x06gender\x12\x18\n" +"\ahobbies\x18\x04 \x03(\tR\ahobbies\x12*\n" +"\aaddress\x18\x05 \x01(\v2\x10.example.AddressR\aaddress\",\n" +"\rPersonRequest\x12\x1b\n" +"\tperson_id\x18\x01 \x01(\x05R\bpersonId\"9\n" +"\x0ePersonResponse\x12'\n" +"\x06person\x18\x01 \x01(\v2\x0f.example.PersonR\x06person*+\n" +"\x06Gender\x12\v\n" +"\aUNKNOWN\x10\x00\x12\b\n" +"\x04MALE\x10\x01\x12\n" +"\n" +"\x06FEMALE\x10\x022Q\n" +"\rPersonService\x12@\n" +"\rGetPersonInfo\x12\x16.example.PersonRequest\x1a\x17.example.PersonResponseB1Z/github.com/yourusername/protobuf-example/personb\x06proto3"var (file_person_proto_rawDescOnce sync.Oncefile_person_proto_rawDescData []byte
)func file_person_proto_rawDescGZIP() []byte {file_person_proto_rawDescOnce.Do(func() {file_person_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_person_proto_rawDesc), len(file_person_proto_rawDesc)))})return file_person_proto_rawDescData
}var file_person_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_person_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_person_proto_goTypes = []any{(Gender)(0), // 0: example.Gender(*Address)(nil), // 1: example.Address(*Person)(nil), // 2: example.Person(*PersonRequest)(nil), // 3: example.PersonRequest(*PersonResponse)(nil), // 4: example.PersonResponse
}
var file_person_proto_depIdxs = []int32{0, // 0: example.Person.gender:type_name -> example.Gender1, // 1: example.Person.address:type_name -> example.Address2, // 2: example.PersonResponse.person:type_name -> example.Person3, // 3: example.PersonService.GetPersonInfo:input_type -> example.PersonRequest4, // 4: example.PersonService.GetPersonInfo:output_type -> example.PersonResponse4, // [4:5] is the sub-list for method output_type3, // [3:4] is the sub-list for method input_type3, // [3:3] is the sub-list for extension type_name3, // [3:3] is the sub-list for extension extendee0, // [0:3] is the sub-list for field type_name
}func init() { file_person_proto_init() }
func file_person_proto_init() {if File_person_proto != nil {return}type x struct{}out := protoimpl.TypeBuilder{File: protoimpl.DescBuilder{GoPackagePath: reflect.TypeOf(x{}).PkgPath(),RawDescriptor: unsafe.Slice(unsafe.StringData(file_person_proto_rawDesc), len(file_person_proto_rawDesc)),NumEnums: 1,NumMessages: 4,NumExtensions: 0,NumServices: 1,},GoTypes: file_person_proto_goTypes,DependencyIndexes: file_person_proto_depIdxs,EnumInfos: file_person_proto_enumTypes,MessageInfos: file_person_proto_msgTypes,}.Build()File_person_proto = out.Filefile_person_proto_goTypes = nilfile_person_proto_depIdxs = nil
}
3.4.3.生成 go-grpc 文件 _grpc.pb.go
# 運行 protoc 命令,生成 go-grpc 文件
protoc \--go-grpc_out=./person \--go-grpc_opt=paths=source_relative \person.proto# 如果報錯有下面報錯,說明protoc-gen-go沒有加入環境變量,處理一下就好
# protoc-gen-go: program not found or is not executable
# Please specify a program using absolute path or make sure the program is available in your PATH system variable
# --go_out: protoc-gen-go: Plugin failed with status code 1.
export PATH=$PATH:$HOME/go/bin
- 生成結果:
- 文件:
person/person_grpc.pb.go
- 內容包含:
PersonServiceClient
、PersonServiceServer
等 客戶端-服務端 Interface,用于外部使用。- 自動生成
PersonServiceClient
、PersonServiceServer
接口的實現,比如UnimplementedPersonServiceServer,不過server我們一般都會在自行編寫,不會直接使用 - 自動生成 將本grpc server 注冊到 grpc內部注冊中心的方法,即:
RegisterPersonServiceServer
,后面編寫服務端時會用到
- 文件:
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.5.1
// - protoc v5.29.3
// source: person.protopackage personimport (context "context"grpc "google.golang.org/grpc"codes "google.golang.org/grpc/codes"status "google.golang.org/grpc/status"
)// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9const (PersonService_GetPersonInfo_FullMethodName = "/example.PersonService/GetPersonInfo"
)// PersonServiceClient is the client API for PersonService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
//
// 可選:RPC 服務定義(如果需要)
type PersonServiceClient interface {GetPersonInfo(ctx context.Context, in *PersonRequest, opts ...grpc.CallOption) (*PersonResponse, error)
}type personServiceClient struct {cc grpc.ClientConnInterface
}func NewPersonServiceClient(cc grpc.ClientConnInterface) PersonServiceClient {return &personServiceClient{cc}
}func (c *personServiceClient) GetPersonInfo(ctx context.Context, in *PersonRequest, opts ...grpc.CallOption) (*PersonResponse, error) {cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)out := new(PersonResponse)err := c.cc.Invoke(ctx, PersonService_GetPersonInfo_FullMethodName, in, out, cOpts...)if err != nil {return nil, err}return out, nil
}// PersonServiceServer is the server API for PersonService service.
// All implementations must embed UnimplementedPersonServiceServer
// for forward compatibility.
//
// 可選:RPC 服務定義(如果需要)
type PersonServiceServer interface {GetPersonInfo(context.Context, *PersonRequest) (*PersonResponse, error)mustEmbedUnimplementedPersonServiceServer()
}// UnimplementedPersonServiceServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedPersonServiceServer struct{}func (UnimplementedPersonServiceServer) GetPersonInfo(context.Context, *PersonRequest) (*PersonResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method GetPersonInfo not implemented")
}
func (UnimplementedPersonServiceServer) mustEmbedUnimplementedPersonServiceServer() {}
func (UnimplementedPersonServiceServer) testEmbeddedByValue() {}// UnsafePersonServiceServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to PersonServiceServer will
// result in compilation errors.
type UnsafePersonServiceServer interface {mustEmbedUnimplementedPersonServiceServer()
}func RegisterPersonServiceServer(s grpc.ServiceRegistrar, srv PersonServiceServer) {// If the following call pancis, it indicates UnimplementedPersonServiceServer was// embedded by pointer and is nil. This will cause panics if an// unimplemented method is ever invoked, so we test this at initialization// time to prevent it from happening at runtime later due to I/O.if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {t.testEmbeddedByValue()}s.RegisterService(&PersonService_ServiceDesc, srv)
}func _PersonService_GetPersonInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {in := new(PersonRequest)if err := dec(in); err != nil {return nil, err}if interceptor == nil {return srv.(PersonServiceServer).GetPersonInfo(ctx, in)}info := &grpc.UnaryServerInfo{Server: srv,FullMethod: PersonService_GetPersonInfo_FullMethodName,}handler := func(ctx context.Context, req interface{}) (interface{}, error) {return srv.(PersonServiceServer).GetPersonInfo(ctx, req.(*PersonRequest))}return interceptor(ctx, in, info, handler)
}// PersonService_ServiceDesc is the grpc.ServiceDesc for PersonService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var PersonService_ServiceDesc = grpc.ServiceDesc{ServiceName: "example.PersonService",HandlerType: (*PersonServiceServer)(nil),Methods: []grpc.MethodDesc{{MethodName: "GetPersonInfo",Handler: _PersonService_GetPersonInfo_Handler,},},Streams: []grpc.StreamDesc{},Metadata: "person.proto",
}
4.基于gRPC的客戶端與服務端編寫
項目目錄:
.
├── client
│ └── client.go
├── proto
│ ├── person
│ │ ├── person.pb.go
│ │ └── person_grpc.pb.go
│ └── person.proto
└── server└── server.go
4.1.gRPC服務端編寫
4.1.1.服務端邏輯
- 創建 gRPC Server 對象,即服務端抽象核心
- 將server對象注冊到 gRPC Server 的內部注冊中心,這樣可以在接收到請求時,通過內部服務發現,發現服務端接口并轉接進行邏輯處理
- 創建 Listen 監聽 TCP 端口 → 建立網絡通信通道
- 啟動 gRPC Server 接受請求 → 持續處理客戶端連接
4.1.2.具體代碼實現
package mainimport ("context""fmt"pb "test-project/rpc/proto/person""google.golang.org/grpc""net"
)// 自行實現 grpc 的server端,沒有直接使用 _grpc.pb.g 文件自動生成的server
type server struct {pb.UnimplementedPersonServiceServer
}func (s *server) GetPersonInfo(ctx context.Context, req *pb.PersonRequest) (*pb.PersonResponse, error) {fmt.Println(fmt.Sprintf("get person info by id: %v", req.GetPersonId()))return &pb.PersonResponse{Person: &pb.Person{Name: "graham",Age: 18,Gender: pb.Gender_MALE,Hobbies: nil,Address: &pb.Address{City: "ShangHai",Street: "waitan",ZipCode: 1,},},}, nil
}func main() {// 開啟端口listen, _ := net.Listen("tcp", ":8090")// 創建grpc服務,這里啟動的服務沒有開啟安全認證,生產使用要加一些認證grpcServer := grpc.NewServer()// 在grpc服務端 注冊我們自己編寫的服務pb.RegisterPersonServiceServer(grpcServer, &server{})// 啟動grpc服務if err := grpcServer.Serve(listen); err != nil {fmt.Printf("failed to server: %v", err)return}
}
4.2.gRPC客戶端編寫
4.2.1.客戶端邏輯
- 創建于給定目標(服務端)的連接交互
- 創建 server的客戶端對象
- 發送rpc請求,等到同步響應,得到回調后的響應結果
- 輸出響應結果
4.2.2.具體代碼實現
package mainimport ("context""fmt"pb "test-project/rpc/proto/person""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure""os"
)func main() {// 創建一個到 grpc server 的連接,這里先使用了無加密和驗證的連接,生產使用要加一些認證conn, err := grpc.Dial("127.0.0.1:8090", grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {fmt.Printf("did not connect: %v", err)os.Exit(1)}defer func() {if err := conn.Close(); err != nil {fmt.Printf("did not close connect: %v", err)return}}()// 使用連接創建一個客戶端對象【由自動生成的 *_grpc.pb.go 提供】client := pb.NewPersonServiceClient(conn)// 使用客戶端,調用GetPersonInfo grpc方法resp, err := client.GetPersonInfo(context.Background(), &pb.PersonRequest{PersonId: 1})if err != nil {fmt.Printf("call grpc func GetPersonInfo failed: %v", err)return}fmt.Println(resp)}
5.CRI gRPC實現
前面學習完了grpc的基礎知識,我們應該可以看懂CRI的基本實現
5.1.1.cri proto文件
- github地址:https://github.com/kubernetes/cri-api
- 路徑:
staging/src/k8s.io/cri-api/pkg/apis/runtime/v1/api.proto
- 文件內容較多,這里只貼出兩個service服務接口:RuntimeService、ImageService
- 可以看出,里面定義了很多rpc方法,供具體的服務端實現
- kubelet 作為client調用接口,containerd作為服務端實現接口
......
// Runtime service defines the public APIs for remote container runtimes
service RuntimeService {// Version returns the runtime name, runtime version, and runtime API version.rpc Version(VersionRequest) returns (VersionResponse) {}// RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure// the sandbox is in the ready state on success.rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {}// StopPodSandbox stops any running process that is part of the sandbox and// reclaims network resources (e.g., IP addresses) allocated to the sandbox.// If there are any running containers in the sandbox, they must be forcibly// terminated.// This call is idempotent, and must not return an error if all relevant// resources have already been reclaimed. kubelet will call StopPodSandbox// at least once before calling RemovePodSandbox. It will also attempt to// reclaim resources eagerly, as soon as a sandbox is not needed. Hence,// multiple StopPodSandbox calls are expected.rpc StopPodSandbox(StopPodSandboxRequest) returns (StopPodSandboxResponse) {}// RemovePodSandbox removes the sandbox. If there are any running containers// in the sandbox, they must be forcibly terminated and removed.// This call is idempotent, and must not return an error if the sandbox has// already been removed.rpc RemovePodSandbox(RemovePodSandboxRequest) returns (RemovePodSandboxResponse) {}// PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not// present, returns an error.rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {}// ListPodSandbox returns a list of PodSandboxes.rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {}// CreateContainer creates a new container in specified PodSandboxrpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {}// StartContainer starts the container.rpc StartContainer(StartContainerRequest) returns (StartContainerResponse) {}// StopContainer stops a running container with a grace period (i.e., timeout).// This call is idempotent, and must not return an error if the container has// already been stopped.// The runtime must forcibly kill the container after the grace period is// reached.rpc StopContainer(StopContainerRequest) returns (StopContainerResponse) {}// RemoveContainer removes the container. If the container is running, the// container must be forcibly removed.// This call is idempotent, and must not return an error if the container has// already been removed.rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {}// ListContainers lists all containers by filters.rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {}// ContainerStatus returns status of the container. If the container is not// present, returns an error.rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {}// UpdateContainerResources updates ContainerConfig of the container synchronously.// If runtime fails to transactionally update the requested resources, an error is returned.rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {}// ReopenContainerLog asks runtime to reopen the stdout/stderr log file// for the container. This is often called after the log file has been// rotated. If the container is not running, container runtime can choose// to either create a new log file and return nil, or return an error.// Once it returns error, new container log file MUST NOT be created.rpc ReopenContainerLog(ReopenContainerLogRequest) returns (ReopenContainerLogResponse) {}// ExecSync runs a command in a container synchronously.rpc ExecSync(ExecSyncRequest) returns (ExecSyncResponse) {}// Exec prepares a streaming endpoint to execute a command in the container.rpc Exec(ExecRequest) returns (ExecResponse) {}// Attach prepares a streaming endpoint to attach to a running container.rpc Attach(AttachRequest) returns (AttachResponse) {}// PortForward prepares a streaming endpoint to forward ports from a PodSandbox.rpc PortForward(PortForwardRequest) returns (PortForwardResponse) {}// ContainerStats returns stats of the container. If the container does not// exist, the call returns an error.rpc ContainerStats(ContainerStatsRequest) returns (ContainerStatsResponse) {}// ListContainerStats returns stats of all running containers.rpc ListContainerStats(ListContainerStatsRequest) returns (ListContainerStatsResponse) {}// PodSandboxStats returns stats of the pod sandbox. If the pod sandbox does not// exist, the call returns an error.rpc PodSandboxStats(PodSandboxStatsRequest) returns (PodSandboxStatsResponse) {}// ListPodSandboxStats returns stats of the pod sandboxes matching a filter.rpc ListPodSandboxStats(ListPodSandboxStatsRequest) returns (ListPodSandboxStatsResponse) {}// UpdateRuntimeConfig updates the runtime configuration based on the given request.rpc UpdateRuntimeConfig(UpdateRuntimeConfigRequest) returns (UpdateRuntimeConfigResponse) {}// Status returns the status of the runtime.rpc Status(StatusRequest) returns (StatusResponse) {}// CheckpointContainer checkpoints a containerrpc CheckpointContainer(CheckpointContainerRequest) returns (CheckpointContainerResponse) {}// GetContainerEvents gets container events from the CRI runtimerpc GetContainerEvents(GetEventsRequest) returns (stream ContainerEventResponse) {}// ListMetricDescriptors gets the descriptors for the metrics that will be returned in ListPodSandboxMetrics.// This list should be static at startup: either the client and server restart together when// adding or removing metrics descriptors, or they should not change.// Put differently, if ListPodSandboxMetrics references a name that is not described in the initial// ListMetricDescriptors call, then the metric will not be broadcasted.rpc ListMetricDescriptors(ListMetricDescriptorsRequest) returns (ListMetricDescriptorsResponse) {}// ListPodSandboxMetrics gets pod sandbox metrics from CRI Runtimerpc ListPodSandboxMetrics(ListPodSandboxMetricsRequest) returns (ListPodSandboxMetricsResponse) {}// RuntimeConfig returns configuration information of the runtime.// A couple of notes:// - The RuntimeConfigRequest object is not to be confused with the contents of UpdateRuntimeConfigRequest.// The former is for having runtime tell Kubelet what to do, the latter vice versa.// - It is the expectation of the Kubelet that these fields are static for the lifecycle of the Kubelet.// The Kubelet will not re-request the RuntimeConfiguration after startup, and CRI implementations should// avoid updating them without a full node reboot.rpc RuntimeConfig(RuntimeConfigRequest) returns (RuntimeConfigResponse) {}
}// ImageService defines the public APIs for managing images.
service ImageService {// ListImages lists existing images.rpc ListImages(ListImagesRequest) returns (ListImagesResponse) {}// ImageStatus returns the status of the image. If the image is not// present, returns a response with ImageStatusResponse.Image set to// nil.rpc ImageStatus(ImageStatusRequest) returns (ImageStatusResponse) {}// PullImage pulls an image with authentication config.rpc PullImage(PullImageRequest) returns (PullImageResponse) {}// RemoveImage removes the image.// This call is idempotent, and must not return an error if the image has// already been removed.rpc RemoveImage(RemoveImageRequest) returns (RemoveImageResponse) {}// ImageFSInfo returns information of the filesystem that is used to store images.rpc ImageFsInfo(ImageFsInfoRequest) returns (ImageFsInfoResponse) {}
}
......
5.1.2.cri pb.go 與 _grpc.pb.go 文件
- cri-api項目中,將我們在3.4中演示的 pb.go 與 _grpc.pb.go 文件內容,生成到了一個文件中:
staging/src/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go
- 該文件包含 proto定義的所有對象結構go結構、所有service的go結構、client接口及對象、server接口及對象
......// RuntimeServiceClient is the client API for RuntimeService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type RuntimeServiceClient interface {// Version returns the runtime name, runtime version, and runtime API version.Version(ctx context.Context, in *VersionRequest, opts ...grpc.CallOption) (*VersionResponse, error)// RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure// the sandbox is in the ready state on success.RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error)// StopPodSandbox stops any running process that is part of the sandbox and// reclaims network resources (e.g., IP addresses) allocated to the sandbox.// If there are any running containers in the sandbox, they must be forcibly// terminated.// This call is idempotent, and must not return an error if all relevant// resources have already been reclaimed. kubelet will call StopPodSandbox// at least once before calling RemovePodSandbox. It will also attempt to// reclaim resources eagerly, as soon as a sandbox is not needed. Hence,// multiple StopPodSandbox calls are expected.StopPodSandbox(ctx context.Context, in *StopPodSandboxRequest, opts ...grpc.CallOption) (*StopPodSandboxResponse, error)// RemovePodSandbox removes the sandbox. If there are any running containers// in the sandbox, they must be forcibly terminated and removed.// This call is idempotent, and must not return an error if the sandbox has// already been removed.RemovePodSandbox(ctx context.Context, in *RemovePodSandboxRequest, opts ...grpc.CallOption) (*RemovePodSandboxResponse, error)// PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not// present, returns an error.PodSandboxStatus(ctx context.Context, in *PodSandboxStatusRequest, opts ...grpc.CallOption) (*PodSandboxStatusResponse, error)// ListPodSandbox returns a list of PodSandboxes.ListPodSandbox(ctx context.Context, in *ListPodSandboxRequest, opts ...grpc.CallOption) (*ListPodSandboxResponse, error)// CreateContainer creates a new container in specified PodSandboxCreateContainer(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error)// StartContainer starts the container.StartContainer(ctx context.Context, in *StartContainerRequest, opts ...grpc.CallOption) (*StartContainerResponse, error)// StopContainer stops a running container with a grace period (i.e., timeout).// This call is idempotent, and must not return an error if the container has// already been stopped.// The runtime must forcibly kill the container after the grace period is// reached.StopContainer(ctx context.Context, in *StopContainerRequest, opts ...grpc.CallOption) (*StopContainerResponse, error)// RemoveContainer removes the container. If the container is running, the// container must be forcibly removed.// This call is idempotent, and must not return an error if the container has// already been removed.RemoveContainer(ctx context.Context, in *RemoveContainerRequest, opts ...grpc.CallOption) (*RemoveContainerResponse, error)// ListContainers lists all containers by filters.ListContainers(ctx context.Context, in *ListContainersRequest, opts ...grpc.CallOption) (*ListContainersResponse, error)// ContainerStatus returns status of the container. If the container is not// present, returns an error.ContainerStatus(ctx context.Context, in *ContainerStatusRequest, opts ...grpc.CallOption) (*ContainerStatusResponse, error)// UpdateContainerResources updates ContainerConfig of the container synchronously.// If runtime fails to transactionally update the requested resources, an error is returned.UpdateContainerResources(ctx context.Context, in *UpdateContainerResourcesRequest, opts ...grpc.CallOption) (*UpdateContainerResourcesResponse, error)// ReopenContainerLog asks runtime to reopen the stdout/stderr log file// for the container. This is often called after the log file has been// rotated. If the container is not running, container runtime can choose// to either create a new log file and return nil, or return an error.// Once it returns error, new container log file MUST NOT be created.ReopenContainerLog(ctx context.Context, in *ReopenContainerLogRequest, opts ...grpc.CallOption) (*ReopenContainerLogResponse, error)// ExecSync runs a command in a container synchronously.ExecSync(ctx context.Context, in *ExecSyncRequest, opts ...grpc.CallOption) (*ExecSyncResponse, error)// Exec prepares a streaming endpoint to execute a command in the container.Exec(ctx context.Context, in *ExecRequest, opts ...grpc.CallOption) (*ExecResponse, error)// Attach prepares a streaming endpoint to attach to a running container.Attach(ctx context.Context, in *AttachRequest, opts ...grpc.CallOption) (*AttachResponse, error)// PortForward prepares a streaming endpoint to forward ports from a PodSandbox.PortForward(ctx context.Context, in *PortForwardRequest, opts ...grpc.CallOption) (*PortForwardResponse, error)// ContainerStats returns stats of the container. If the container does not// exist, the call returns an error.ContainerStats(ctx context.Context, in *ContainerStatsRequest, opts ...grpc.CallOption) (*ContainerStatsResponse, error)// ListContainerStats returns stats of all running containers.ListContainerStats(ctx context.Context, in *ListContainerStatsRequest, opts ...grpc.CallOption) (*ListContainerStatsResponse, error)// PodSandboxStats returns stats of the pod sandbox. If the pod sandbox does not// exist, the call returns an error.PodSandboxStats(ctx context.Context, in *PodSandboxStatsRequest, opts ...grpc.CallOption) (*PodSandboxStatsResponse, error)// ListPodSandboxStats returns stats of the pod sandboxes matching a filter.ListPodSandboxStats(ctx context.Context, in *ListPodSandboxStatsRequest, opts ...grpc.CallOption) (*ListPodSandboxStatsResponse, error)// UpdateRuntimeConfig updates the runtime configuration based on the given request.UpdateRuntimeConfig(ctx context.Context, in *UpdateRuntimeConfigRequest, opts ...grpc.CallOption) (*UpdateRuntimeConfigResponse, error)// Status returns the status of the runtime.Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error)// CheckpointContainer checkpoints a containerCheckpointContainer(ctx context.Context, in *CheckpointContainerRequest, opts ...grpc.CallOption) (*CheckpointContainerResponse, error)// GetContainerEvents gets container events from the CRI runtimeGetContainerEvents(ctx context.Context, in *GetEventsRequest, opts ...grpc.CallOption) (RuntimeService_GetContainerEventsClient, error)// ListMetricDescriptors gets the descriptors for the metrics that will be returned in ListPodSandboxMetrics.// This list should be static at startup: either the client and server restart together when// adding or removing metrics descriptors, or they should not change.// Put differently, if ListPodSandboxMetrics references a name that is not described in the initial// ListMetricDescriptors call, then the metric will not be broadcasted.ListMetricDescriptors(ctx context.Context, in *ListMetricDescriptorsRequest, opts ...grpc.CallOption) (*ListMetricDescriptorsResponse, error)// ListPodSandboxMetrics gets pod sandbox metrics from CRI RuntimeListPodSandboxMetrics(ctx context.Context, in *ListPodSandboxMetricsRequest, opts ...grpc.CallOption) (*ListPodSandboxMetricsResponse, error)// RuntimeConfig returns configuration information of the runtime.// A couple of notes:// - The RuntimeConfigRequest object is not to be confused with the contents of UpdateRuntimeConfigRequest.// The former is for having runtime tell Kubelet what to do, the latter vice versa.// - It is the expectation of the Kubelet that these fields are static for the lifecycle of the Kubelet.// The Kubelet will not re-request the RuntimeConfiguration after startup, and CRI implementations should// avoid updating them without a full node reboot.RuntimeConfig(ctx context.Context, in *RuntimeConfigRequest, opts ...grpc.CallOption) (*RuntimeConfigResponse, error)
}type runtimeServiceClient struct {cc *grpc.ClientConn
}func NewRuntimeServiceClient(cc *grpc.ClientConn) RuntimeServiceClient {return &runtimeServiceClient{cc}
}func (c *runtimeServiceClient) Version(ctx context.Context, in *VersionRequest, opts ...grpc.CallOption) (*VersionResponse, error) {out := new(VersionResponse)err := c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/Version", in, out, opts...)if err != nil {return nil, err}return out, nil
}func (c *runtimeServiceClient) RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error) {out := new(RunPodSandboxResponse)err := c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/RunPodSandbox", in, out, opts...)if err != nil {return nil, err}return out, nil
}// RuntimeServiceServer is the server API for RuntimeService service.
type RuntimeServiceServer interface {// Version returns the runtime name, runtime version, and runtime API version.Version(context.Context, *VersionRequest) (*VersionResponse, error)// RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure// the sandbox is in the ready state on success.RunPodSandbox(context.Context, *RunPodSandboxRequest) (*RunPodSandboxResponse, error)// StopPodSandbox stops any running process that is part of the sandbox and// reclaims network resources (e.g., IP addresses) allocated to the sandbox.// If there are any running containers in the sandbox, they must be forcibly// terminated.// This call is idempotent, and must not return an error if all relevant// resources have already been reclaimed. kubelet will call StopPodSandbox// at least once before calling RemovePodSandbox. It will also attempt to// reclaim resources eagerly, as soon as a sandbox is not needed. Hence,// multiple StopPodSandbox calls are expected.StopPodSandbox(context.Context, *StopPodSandboxRequest) (*StopPodSandboxResponse, error)// RemovePodSandbox removes the sandbox. If there are any running containers// in the sandbox, they must be forcibly terminated and removed.// This call is idempotent, and must not return an error if the sandbox has// already been removed.RemovePodSandbox(context.Context, *RemovePodSandboxRequest) (*RemovePodSandboxResponse, error)// PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not// present, returns an error.PodSandboxStatus(context.Context, *PodSandboxStatusRequest) (*PodSandboxStatusResponse, error)// ListPodSandbox returns a list of PodSandboxes.ListPodSandbox(context.Context, *ListPodSandboxRequest) (*ListPodSandboxResponse, error)// CreateContainer creates a new container in specified PodSandboxCreateContainer(context.Context, *CreateContainerRequest) (*CreateContainerResponse, error)// StartContainer starts the container.StartContainer(context.Context, *StartContainerRequest) (*StartContainerResponse, error)// StopContainer stops a running container with a grace period (i.e., timeout).// This call is idempotent, and must not return an error if the container has// already been stopped.// The runtime must forcibly kill the container after the grace period is// reached.StopContainer(context.Context, *StopContainerRequest) (*StopContainerResponse, error)// RemoveContainer removes the container. If the container is running, the// container must be forcibly removed.// This call is idempotent, and must not return an error if the container has// already been removed.RemoveContainer(context.Context, *RemoveContainerRequest) (*RemoveContainerResponse, error)// ListContainers lists all containers by filters.ListContainers(context.Context, *ListContainersRequest) (*ListContainersResponse, error)// ContainerStatus returns status of the container. If the container is not// present, returns an error.ContainerStatus(context.Context, *ContainerStatusRequest) (*ContainerStatusResponse, error)// UpdateContainerResources updates ContainerConfig of the container synchronously.// If runtime fails to transactionally update the requested resources, an error is returned.UpdateContainerResources(context.Context, *UpdateContainerResourcesRequest) (*UpdateContainerResourcesResponse, error)// ReopenContainerLog asks runtime to reopen the stdout/stderr log file// for the container. This is often called after the log file has been// rotated. If the container is not running, container runtime can choose// to either create a new log file and return nil, or return an error.// Once it returns error, new container log file MUST NOT be created.ReopenContainerLog(context.Context, *ReopenContainerLogRequest) (*ReopenContainerLogResponse, error)// ExecSync runs a command in a container synchronously.ExecSync(context.Context, *ExecSyncRequest) (*ExecSyncResponse, error)// Exec prepares a streaming endpoint to execute a command in the container.Exec(context.Context, *ExecRequest) (*ExecResponse, error)// Attach prepares a streaming endpoint to attach to a running container.Attach(context.Context, *AttachRequest) (*AttachResponse, error)// PortForward prepares a streaming endpoint to forward ports from a PodSandbox.PortForward(context.Context, *PortForwardRequest) (*PortForwardResponse, error)// ContainerStats returns stats of the container. If the container does not// exist, the call returns an error.ContainerStats(context.Context, *ContainerStatsRequest) (*ContainerStatsResponse, error)// ListContainerStats returns stats of all running containers.ListContainerStats(context.Context, *ListContainerStatsRequest) (*ListContainerStatsResponse, error)// PodSandboxStats returns stats of the pod sandbox. If the pod sandbox does not// exist, the call returns an error.PodSandboxStats(context.Context, *PodSandboxStatsRequest) (*PodSandboxStatsResponse, error)// ListPodSandboxStats returns stats of the pod sandboxes matching a filter.ListPodSandboxStats(context.Context, *ListPodSandboxStatsRequest) (*ListPodSandboxStatsResponse, error)// UpdateRuntimeConfig updates the runtime configuration based on the given request.UpdateRuntimeConfig(context.Context, *UpdateRuntimeConfigRequest) (*UpdateRuntimeConfigResponse, error)// Status returns the status of the runtime.Status(context.Context, *StatusRequest) (*StatusResponse, error)// CheckpointContainer checkpoints a containerCheckpointContainer(context.Context, *CheckpointContainerRequest) (*CheckpointContainerResponse, error)// GetContainerEvents gets container events from the CRI runtimeGetContainerEvents(*GetEventsRequest, RuntimeService_GetContainerEventsServer) error// ListMetricDescriptors gets the descriptors for the metrics that will be returned in ListPodSandboxMetrics.// This list should be static at startup: either the client and server restart together when// adding or removing metrics descriptors, or they should not change.// Put differently, if ListPodSandboxMetrics references a name that is not described in the initial// ListMetricDescriptors call, then the metric will not be broadcasted.ListMetricDescriptors(context.Context, *ListMetricDescriptorsRequest) (*ListMetricDescriptorsResponse, error)// ListPodSandboxMetrics gets pod sandbox metrics from CRI RuntimeListPodSandboxMetrics(context.Context, *ListPodSandboxMetricsRequest) (*ListPodSandboxMetricsResponse, error)// RuntimeConfig returns configuration information of the runtime.// A couple of notes:// - The RuntimeConfigRequest object is not to be confused with the contents of UpdateRuntimeConfigRequest.// The former is for having runtime tell Kubelet what to do, the latter vice versa.// - It is the expectation of the Kubelet that these fields are static for the lifecycle of the Kubelet.// The Kubelet will not re-request the RuntimeConfiguration after startup, and CRI implementations should// avoid updating them without a full node reboot.RuntimeConfig(context.Context, *RuntimeConfigRequest) (*RuntimeConfigResponse, error)
}// UnimplementedRuntimeServiceServer can be embedded to have forward compatible implementations.
type UnimplementedRuntimeServiceServer struct {
}func (*UnimplementedRuntimeServiceServer) Version(ctx context.Context, req *VersionRequest) (*VersionResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method Version not implemented")
}
func (*UnimplementedRuntimeServiceServer) RunPodSandbox(ctx context.Context, req *RunPodSandboxRequest) (*RunPodSandboxResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method RunPodSandbox not implemented")
}
func (*UnimplementedRuntimeServiceServer) StopPodSandbox(ctx context.Context, req *StopPodSandboxRequest) (*StopPodSandboxResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method StopPodSandbox not implemented")
}
func (*UnimplementedRuntimeServiceServer) RemovePodSandbox(ctx context.Context, req *RemovePodSandboxRequest) (*RemovePodSandboxResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method RemovePodSandbox not implemented")
}
func (*UnimplementedRuntimeServiceServer) PodSandboxStatus(ctx context.Context, req *PodSandboxStatusRequest) (*PodSandboxStatusResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method PodSandboxStatus not implemented")
}
func (*UnimplementedRuntimeServiceServer) ListPodSandbox(ctx context.Context, req *ListPodSandboxRequest) (*ListPodSandboxResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ListPodSandbox not implemented")
}
func (*UnimplementedRuntimeServiceServer) CreateContainer(ctx context.Context, req *CreateContainerRequest) (*CreateContainerResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method CreateContainer not implemented")
}
func (*UnimplementedRuntimeServiceServer) StartContainer(ctx context.Context, req *StartContainerRequest) (*StartContainerResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method StartContainer not implemented")
}
func (*UnimplementedRuntimeServiceServer) StopContainer(ctx context.Context, req *StopContainerRequest) (*StopContainerResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method StopContainer not implemented")
}
func (*UnimplementedRuntimeServiceServer) RemoveContainer(ctx context.Context, req *RemoveContainerRequest) (*RemoveContainerResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method RemoveContainer not implemented")
}
func (*UnimplementedRuntimeServiceServer) ListContainers(ctx context.Context, req *ListContainersRequest) (*ListContainersResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ListContainers not implemented")
}
func (*UnimplementedRuntimeServiceServer) ContainerStatus(ctx context.Context, req *ContainerStatusRequest) (*ContainerStatusResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ContainerStatus not implemented")
}
func (*UnimplementedRuntimeServiceServer) UpdateContainerResources(ctx context.Context, req *UpdateContainerResourcesRequest) (*UpdateContainerResourcesResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method UpdateContainerResources not implemented")
}
func (*UnimplementedRuntimeServiceServer) ReopenContainerLog(ctx context.Context, req *ReopenContainerLogRequest) (*ReopenContainerLogResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ReopenContainerLog not implemented")
}
func (*UnimplementedRuntimeServiceServer) ExecSync(ctx context.Context, req *ExecSyncRequest) (*ExecSyncResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ExecSync not implemented")
}
func (*UnimplementedRuntimeServiceServer) Exec(ctx context.Context, req *ExecRequest) (*ExecResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method Exec not implemented")
}
func (*UnimplementedRuntimeServiceServer) Attach(ctx context.Context, req *AttachRequest) (*AttachResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method Attach not implemented")
}
func (*UnimplementedRuntimeServiceServer) PortForward(ctx context.Context, req *PortForwardRequest) (*PortForwardResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method PortForward not implemented")
}
func (*UnimplementedRuntimeServiceServer) ContainerStats(ctx context.Context, req *ContainerStatsRequest) (*ContainerStatsResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ContainerStats not implemented")
}
func (*UnimplementedRuntimeServiceServer) ListContainerStats(ctx context.Context, req *ListContainerStatsRequest) (*ListContainerStatsResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ListContainerStats not implemented")
}
func (*UnimplementedRuntimeServiceServer) PodSandboxStats(ctx context.Context, req *PodSandboxStatsRequest) (*PodSandboxStatsResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method PodSandboxStats not implemented")
}
func (*UnimplementedRuntimeServiceServer) ListPodSandboxStats(ctx context.Context, req *ListPodSandboxStatsRequest) (*ListPodSandboxStatsResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ListPodSandboxStats not implemented")
}
func (*UnimplementedRuntimeServiceServer) UpdateRuntimeConfig(ctx context.Context, req *UpdateRuntimeConfigRequest) (*UpdateRuntimeConfigResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method UpdateRuntimeConfig not implemented")
}
func (*UnimplementedRuntimeServiceServer) Status(ctx context.Context, req *StatusRequest) (*StatusResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method Status not implemented")
}
func (*UnimplementedRuntimeServiceServer) CheckpointContainer(ctx context.Context, req *CheckpointContainerRequest) (*CheckpointContainerResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method CheckpointContainer not implemented")
}
func (*UnimplementedRuntimeServiceServer) GetContainerEvents(req *GetEventsRequest, srv RuntimeService_GetContainerEventsServer) error {return status.Errorf(codes.Unimplemented, "method GetContainerEvents not implemented")
}
func (*UnimplementedRuntimeServiceServer) ListMetricDescriptors(ctx context.Context, req *ListMetricDescriptorsRequest) (*ListMetricDescriptorsResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ListMetricDescriptors not implemented")
}
func (*UnimplementedRuntimeServiceServer) ListPodSandboxMetrics(ctx context.Context, req *ListPodSandboxMetricsRequest) (*ListPodSandboxMetricsResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ListPodSandboxMetrics not implemented")
}
func (*UnimplementedRuntimeServiceServer) RuntimeConfig(ctx context.Context, req *RuntimeConfigRequest) (*RuntimeConfigResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method RuntimeConfig not implemented")
}func RegisterRuntimeServiceServer(s *grpc.Server, srv RuntimeServiceServer) {s.RegisterService(&_RuntimeService_serviceDesc, srv)
}
......
5.1.3.cri Manager文件
- 在cri項目中,除了上述文件,還有一個 services 文件:staging/src/k8s.io/cri-api/pkg/apis/services.go
- 該文件定義了
RuntimeService
、ImageManagerService
等接口,用于描述cri提供的具體方法- 客戶端可以對這里面的方法自行實現,以更好的封裝調用 grpc方法的操作
- 比如 cri-client 就通過實現
RuntimeService
、ImageManagerService
接口的所有方法,對grpc進行更好的封裝。- 比如 RunPodSandbox 方法,cri-client實現 RuntimeService 接口,實現邏輯是:通過 .pb.go 中的client,調用grpc方法,實現sandbox的啟動,并獲取輸出
/*
Copyright 2016 The Kubernetes Authors.Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/package criimport ("context""time"runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
)// RuntimeVersioner contains methods for runtime name, version and API version.
type RuntimeVersioner interface {// Version returns the runtime name, runtime version and runtime API versionVersion(ctx context.Context, apiVersion string) (*runtimeapi.VersionResponse, error)
}// ContainerManager contains methods to manipulate containers managed by a
// container runtime. The methods are thread-safe.
type ContainerManager interface {// CreateContainer creates a new container in specified PodSandbox.CreateContainer(ctx context.Context, podSandboxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error)// StartContainer starts the container.StartContainer(ctx context.Context, containerID string) error// StopContainer stops a running container with a grace period (i.e., timeout).StopContainer(ctx context.Context, containerID string, timeout int64) error// RemoveContainer removes the container.RemoveContainer(ctx context.Context, containerID string) error// ListContainers lists all containers by filters.ListContainers(ctx context.Context, filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error)// ContainerStatus returns the status of the container.ContainerStatus(ctx context.Context, containerID string, verbose bool) (*runtimeapi.ContainerStatusResponse, error)// UpdateContainerResources updates ContainerConfig of the container synchronously.// If runtime fails to transactionally update the requested resources, an error is returned.UpdateContainerResources(ctx context.Context, containerID string, resources *runtimeapi.ContainerResources) error// ExecSync executes a command in the container, and returns the stdout output.// If command exits with a non-zero exit code, an error is returned.ExecSync(ctx context.Context, containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error)// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.Exec(ctx context.Context, request *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error)// Attach prepares a streaming endpoint to attach to a running container, and returns the address.Attach(ctx context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error)// ReopenContainerLog asks runtime to reopen the stdout/stderr log file// for the container. If it returns error, new container log file MUST NOT// be created.ReopenContainerLog(ctx context.Context, ContainerID string) error// CheckpointContainer checkpoints a containerCheckpointContainer(ctx context.Context, options *runtimeapi.CheckpointContainerRequest) error// GetContainerEvents gets container events from the CRI runtimeGetContainerEvents(ctx context.Context, containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error
}// PodSandboxManager contains methods for operating on PodSandboxes. The methods
// are thread-safe.
type PodSandboxManager interface {// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure// the sandbox is in ready state.RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error)// StopPodSandbox stops the sandbox. If there are any running containers in the// sandbox, they should be force terminated.StopPodSandbox(pctx context.Context, odSandboxID string) error// RemovePodSandbox removes the sandbox. If there are running containers in the// sandbox, they should be forcibly removed.RemovePodSandbox(ctx context.Context, podSandboxID string) error// PodSandboxStatus returns the Status of the PodSandbox.PodSandboxStatus(ctx context.Context, podSandboxID string, verbose bool) (*runtimeapi.PodSandboxStatusResponse, error)// ListPodSandbox returns a list of Sandbox.ListPodSandbox(ctx context.Context, filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error)// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.PortForward(ctx context.Context, request *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error)
}// ContainerStatsManager contains methods for retrieving the container
// statistics.
type ContainerStatsManager interface {// ContainerStats returns stats of the container. If the container does not// exist, the call returns an error.ContainerStats(ctx context.Context, containerID string) (*runtimeapi.ContainerStats, error)// ListContainerStats returns stats of all running containers.ListContainerStats(ctx context.Context, filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error)// PodSandboxStats returns stats of the pod. If the pod does not// exist, the call returns an error.PodSandboxStats(ctx context.Context, podSandboxID string) (*runtimeapi.PodSandboxStats, error)// ListPodSandboxStats returns stats of all running pods.ListPodSandboxStats(ctx context.Context, filter *runtimeapi.PodSandboxStatsFilter) ([]*runtimeapi.PodSandboxStats, error)// ListMetricDescriptors gets the descriptors for the metrics that will be returned in ListPodSandboxMetrics.ListMetricDescriptors(ctx context.Context) ([]*runtimeapi.MetricDescriptor, error)// ListPodSandboxMetrics returns metrics of all running pods.ListPodSandboxMetrics(ctx context.Context) ([]*runtimeapi.PodSandboxMetrics, error)
}// RuntimeService interface should be implemented by a container runtime.
// The methods should be thread-safe.
type RuntimeService interface {RuntimeVersionerContainerManagerPodSandboxManagerContainerStatsManager// UpdateRuntimeConfig updates runtime configuration if specifiedUpdateRuntimeConfig(ctx context.Context, runtimeConfig *runtimeapi.RuntimeConfig) error// Status returns the status of the runtime.Status(ctx context.Context, verbose bool) (*runtimeapi.StatusResponse, error)// RuntimeConfig returns the configuration information of the runtime.RuntimeConfig(ctx context.Context) (*runtimeapi.RuntimeConfigResponse, error)
}// ImageManagerService interface should be implemented by a container image
// manager.
// The methods should be thread-safe.
type ImageManagerService interface {// ListImages lists the existing images.ListImages(ctx context.Context, filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error)// ImageStatus returns the status of the image.ImageStatus(ctx context.Context, image *runtimeapi.ImageSpec, verbose bool) (*runtimeapi.ImageStatusResponse, error)// PullImage pulls an image with the authentication config.PullImage(ctx context.Context, image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error)// RemoveImage removes the image.RemoveImage(ctx context.Context, image *runtimeapi.ImageSpec) error// ImageFsInfo returns information of the filesystem(s) used to store the read-only layers and the writeable layer.ImageFsInfo(ctx context.Context) (*runtimeapi.ImageFsInfoResponse, error)
}