云原生学习路线导航页(持续更新中)
- kubernetes学习系列快捷链接
- Kubernetes架构原则和对象设计(一)
- Kubernetes架构原则和对象设计(二)
- Kubernetes架构原则和对象设计(三)
- Kubernetes控制平面组件:etcd(一)
- Kubernetes控制平面组件:etcd(二)
- Kubernetes控制平面组件:API Server详解(一)
- Kubernetes控制平面组件:API Server详解(二)
- Kubernetes控制平面组件:调度器Scheduler(一)
- Kubernetes控制平面组件:调度器Scheduler(二)
- Kubernetes控制平面组件:Controller Manager详解
- Kubernetes控制平面组件:Controller Manager 之 内置Controller详解
- Kubernetes控制平面组件:Controller Manager 之 NamespaceController 全方位讲解
- Kubernetes控制平面组件:Kubelet详解(一):API接口层介绍
- Kubernetes控制平面组件:Kubelet详解(二):核心功能层
- Kubernetes控制平面组件:Kubelet详解(三):CRI 容器运行时接口层
本文是 kubernetes 的控制面组件 kubelet 系列文章第四篇,主要讲解了gRPC的基本概念,工作流程、关键特性等,并对protobuf的安装方法、使用方法 等做了介绍,随后给出了grpc服务端和客户端编写的案例,最后看了下cri-api项目如何使用grpc协议定义了cri接口
- 希望大家多多 点赞 关注 评论 收藏,作者会更有动力继续编写技术文章
1.为什么学习grpc
- 在上一节 Kubernetes控制平面组件:Kubelet详解(三):CRI 容器运行时接口层 中我们提到了 CRI是基于grpc的容器运行时接口标准,kubelet与实际运行时通过grpc交互,使用protobuf协议通信。那么grpc究竟是什么东西?
- 本节将学习下grpc的相关内容
2.RPC 与 gRPC 简介
2.1.应用架构的变化
2.1.1.单体架构
- 一旦某个服务宕机,会引起整个应用不可用,隔离性差
- 只能整体应用进行伸缩,浪费资源,可伸缩性差
- 代码耦合在一起,可维护性差
2.1.2.微服务架构
- 解决了单体架构的弊端,但同时引入了新问题:
- 代码冗余
- 服务和服务之间存在调用关系
- 微服务调用细节:
- 服务拆分后,调用变为进程间、服务器间的通信。
- 需发起网络调用(如 HTTP),但 HTTP 性能较低。
- 解决方案:引入 RPC(远程过程调用),通过自定义 TCP 协议提升传输效率。
2.2.RPC简介
2.2.1.RPC是什么
- RPC(Remote Procedure Call,远程过程调用)
- 全称:Remote Procedure Call
- 定义:一种用于屏蔽分布式计算中各种调用细节的协议,使开发者能够像调用本地函数一样直接调用远程函数。
2.2.2.RPC 客户端与服务端通信过程
- 客户端发送数据(以字节流形式传输)
- 服务端接收并解析数据,根据预定义约定执行对应操作
- 服务端将执行结果返回给客户端
2.2.1.RPC 的核心作用
- 封装优化:RPC就是将上述通信过程封装,简化操作流程
- 协议标准化:采用公认协议实现规范化通信
- 价值创造:通过框架工具直接或间接产生经济效益
2.3.gRPC简介
2.3.1.gRPC简介
- gRPC官方定义
- 英文原文:A high-performance, open-source universal RPC framework
- 中文释义:gRPC是一个高性能、开源的通用RPC框架。
- gprc官网:https://grpc.io/
- grpc中文文档:https://doc.oschina.net/grpc
- 简单理解,rpc是一种通信规范,grpc是对rpc协议的落地实现
- grpc 的g表示谷歌,并非go,grpc是支持多语言的
- grpc 的g表示谷歌,并非go,grpc是支持多语言的
2.3.2.gRPC核心概念
- 角色定义
- Client:调用方,客户端
- Server:被调用方,服务端
- 服务定义思想
- 通过语言无关的方式描述服务,包括:
- 服务名称
- 可调用方法
- 方法的入参与回参格式
- 通过语言无关的方式描述服务,包括:
2.3.3.gRPC工作流程
- client端:直接调用预定义的方法即可获得预期的结果,gRPC自动处理底层通信细节
- server端:只需实现定义的方法逻辑,gRPC自动处理底层通信细节
2.3.4.gRPC关键特性
- 接口约定模式
- 类似定义接口,Server实现接口,Client调用server实现的接口代理对象
- 其他的内容如通信、序列化等底层细节都交给gRPC
- 语言无关性
- 支持跨语言调用(如C++服务端 + Golang/Java客户端)
- 服务定义与编解码过程,均与语言无关
2.3.5.gRPC安装
go get google.golang.org/grpc
3.Protocol Buffers 详解
3.1.Protocol Buffers 简介
3.1.1.Protocol Buffers 是什么
- Protocol Buffers 是谷歌开源的一种数据格式,通常称为 protobuf,适合高性能,对响应速度有要求的数据传输场景。
- gRPC使用了 Protocol Buffers(protobuf) 进行数据的序列化、反序列化
- profobuf 是二进制数据格式,需要编码和解码。
- profobuf 数据本身不具有可读性,只能反序列化之后得到真正可读的数据。
- 怎么理解 protobuf ?
- 可以把他当成一个 代码生成工具以及序列化工具
- 这个工具可以把定义的方法,转换成特定语言的代码。比如你定义了一种类型的参数,他会帮你转换成Golang中的struct结构体,你定义的方法,他会帮你转换成func函数。
- 此外,在发送请求和接受响应的时候,这个工具还会完成对应的 编码和解码 工作,将你即将发送的数据编码成gRPC能够传输的形式,又或者 解码 接收到的数据格式 为 具体语言的某种数据格式
- 什么是序列化/反序列化
- 序列化:将数据结构或对象转换成二进制串的过程
- 反序列化:将在序列化过程中所产生的二进制串转换成数据结构或者对象的过程
3.1.2.protobuf 的优势
- 序列化后体积相比Json和XML 很小,适合网络传输
- 支持跨平台多语言
- 消息格式升级和兼容性还不错
- 序列化反序列化速度很快
3.2.protobuf 工具安装
3.2.1.protoc编译器安装
- windows
- 下载预编译的二进制文件 protoc-*.zip:https://github.com/protocolbuffers/protobuf/releases
- 解压到目录(如 C:\protobuf)。
- 将 bin 目录添加到系统环境变量 PATH 中。
- mac:
# 使用 Homebrew 安装 brew install protobuf
- 验证安装
protoc --version # 输出类似 libprotoc 29.3
3.2.2.go语言代码生成器安装
- 我们接下来代码示例都使用go语言,所以这里安装一个protoc-gen-go,用于将proto文件自动生成go文件
- 如果需要生成其他语言,需要安装其他的语言生成器
# 安装protoc-gen-go
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest# 确认插件安装成功(确保 $GOPATH/bin 在 PATH 中)
which protoc-gen-go
3.3.proto文件编写
- 下面示例包含 基本语法、常用类型、rpc方法
- 文件名:
person.proto
- proto文件的每个字段,都会指定一个唯一数字标识,决定生成的代码中,字段的位置顺序
syntax = "proto3"; // 使用 proto3 语法package example; // 包名(用于代码生成时的命名空间)option go_package = "github.com/yourusername/protobuf-example/person"; // Go 代码的导入路径// 定义枚举
enum Gender {UNKNOWN = 0;MALE = 1;FEMALE = 2;
}// 嵌套消息示例
message Address {string city = 1;string street = 2;int32 zip_code = 3;
}// 主消息
message Person {string name = 1; // 字符串类型int32 age = 2; // 整数类型Gender gender = 3; // 枚举类型repeated string hobbies = 4; // 数组类型(重复字段)Address address = 5; // 嵌套消息
}// 可选:RPC 服务定义(如果需要)
service PersonService {rpc GetPersonInfo (PersonRequest) returns (PersonResponse);
}message PersonRequest {int32 person_id = 1;
}message PersonResponse {Person person = 1;
}
3.4.protoc-gen-go 生成 go 代码
3.4.1.安装 Go 插件
# 安装 protoc-gen-go 插件(生成代码工具)
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest# 确认插件安装成功(确保 $GOPATH/bin 在 PATH 中)
which protoc-gen-go
3.4.2.生成 go 结构文件 pb.go
# 创建生成目录
mkdir -p person# 运行 protoc 命令,生成 go 结构文件
protoc \--go_out=./person \ # 输出目录--go_opt=paths=source_relative \ # 保持相对路径person.proto# 运行 protoc 命令,生成 go-grpc 文件
protoc \--go-grpc_out=./person \--go-grpc_opt=paths=source_relative \person.proto# 如果报错有下面报错,说明protoc-gen-go没有加入环境变量,处理一下就好
# protoc-gen-go: program not found or is not executable
# Please specify a program using absolute path or make sure the program is available in your PATH system variable
# --go_out: protoc-gen-go: Plugin failed with status code 1.
export PATH=$PATH:$HOME/go/bin
- 生成结果:
- 文件:
person/person.pb.go
- 内容包含:
Person
、Address
、Gender
等结构体和序列化方法。
- 文件:
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v5.29.3
// source: person.protopackage personimport (protoreflect "google.golang.org/protobuf/reflect/protoreflect"protoimpl "google.golang.org/protobuf/runtime/protoimpl"reflect "reflect"sync "sync"unsafe "unsafe"
)const (// Verify that this generated code is sufficiently up-to-date._ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)// Verify that runtime/protoimpl is sufficiently up-to-date._ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)// 定义枚举
type Gender int32const (Gender_UNKNOWN Gender = 0Gender_MALE Gender = 1Gender_FEMALE Gender = 2
)// Enum value maps for Gender.
var (Gender_name = map[int32]string{0: "UNKNOWN",1: "MALE",2: "FEMALE",}Gender_value = map[string]int32{"UNKNOWN": 0,"MALE": 1,"FEMALE": 2,}
)func (x Gender) Enum() *Gender {p := new(Gender)*p = xreturn p
}func (x Gender) String() string {return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}func (Gender) Descriptor() protoreflect.EnumDescriptor {return file_person_proto_enumTypes[0].Descriptor()
}func (Gender) Type() protoreflect.EnumType {return &file_person_proto_enumTypes[0]
}func (x Gender) Number() protoreflect.EnumNumber {return protoreflect.EnumNumber(x)
}// Deprecated: Use Gender.Descriptor instead.
func (Gender) EnumDescriptor() ([]byte, []int) {return file_person_proto_rawDescGZIP(), []int{0}
}// 嵌套消息示例
type Address struct {state protoimpl.MessageState `protogen:"open.v1"`City string `protobuf:"bytes,1,opt,name=city,proto3" json:"city,omitempty"`Street string `protobuf:"bytes,2,opt,name=street,proto3" json:"street,omitempty"`ZipCode int32 `protobuf:"varint,3,opt,name=zip_code,json=zipCode,proto3" json:"zip_code,omitempty"`unknownFields protoimpl.UnknownFieldssizeCache protoimpl.SizeCache
}func (x *Address) Reset() {*x = Address{}mi := &file_person_proto_msgTypes[0]ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))ms.StoreMessageInfo(mi)
}func (x *Address) String() string {return protoimpl.X.MessageStringOf(x)
}func (*Address) ProtoMessage() {}func (x *Address) ProtoReflect() protoreflect.Message {mi := &file_person_proto_msgTypes[0]if x != nil {ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))if ms.LoadMessageInfo() == nil {ms.StoreMessageInfo(mi)}return ms}return mi.MessageOf(x)
}// Deprecated: Use Address.ProtoReflect.Descriptor instead.
func (*Address) Descriptor() ([]byte, []int) {return file_person_proto_rawDescGZIP(), []int{0}
}func (x *Address) GetCity() string {if x != nil {return x.City}return ""
}func (x *Address) GetStreet() string {if x != nil {return x.Street}return ""
}func (x *Address) GetZipCode() int32 {if x != nil {return x.ZipCode}return 0
}// 主消息
type Person struct {state protoimpl.MessageState `protogen:"open.v1"`Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` // 字符串类型Age int32 `protobuf:"varint,2,opt,name=age,proto3" json:"age,omitempty"` // 整数类型Gender Gender `protobuf:"varint,3,opt,name=gender,proto3,enum=example.Gender" json:"gender,omitempty"` // 枚举类型Hobbies []string `protobuf:"bytes,4,rep,name=hobbies,proto3" json:"hobbies,omitempty"` // 数组类型(重复字段)Address *Address `protobuf:"bytes,5,opt,name=address,proto3" json:"address,omitempty"` // 嵌套消息unknownFields protoimpl.UnknownFieldssizeCache protoimpl.SizeCache
}func (x *Person) Reset() {*x = Person{}mi := &file_person_proto_msgTypes[1]ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))ms.StoreMessageInfo(mi)
}func (x *Person) String() string {return protoimpl.X.MessageStringOf(x)
}func (*Person) ProtoMessage() {}func (x *Person) ProtoReflect() protoreflect.Message {mi := &file_person_proto_msgTypes[1]if x != nil {ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))if ms.LoadMessageInfo() == nil {ms.StoreMessageInfo(mi)}return ms}return mi.MessageOf(x)
}// Deprecated: Use Person.ProtoReflect.Descriptor instead.
func (*Person) Descriptor() ([]byte, []int) {return file_person_proto_rawDescGZIP(), []int{1}
}func (x *Person) GetName() string {if x != nil {return x.Name}return ""
}func (x *Person) GetAge() int32 {if x != nil {return x.Age}return 0
}func (x *Person) GetGender() Gender {if x != nil {return x.Gender}return Gender_UNKNOWN
}func (x *Person) GetHobbies() []string {if x != nil {return x.Hobbies}return nil
}func (x *Person) GetAddress() *Address {if x != nil {return x.Address}return nil
}type PersonRequest struct {state protoimpl.MessageState `protogen:"open.v1"`PersonId int32 `protobuf:"varint,1,opt,name=person_id,json=personId,proto3" json:"person_id,omitempty"`unknownFields protoimpl.UnknownFieldssizeCache protoimpl.SizeCache
}func (x *PersonRequest) Reset() {*x = PersonRequest{}mi := &file_person_proto_msgTypes[2]ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))ms.StoreMessageInfo(mi)
}func (x *PersonRequest) String() string {return protoimpl.X.MessageStringOf(x)
}func (*PersonRequest) ProtoMessage() {}func (x *PersonRequest) ProtoReflect() protoreflect.Message {mi := &file_person_proto_msgTypes[2]if x != nil {ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))if ms.LoadMessageInfo() == nil {ms.StoreMessageInfo(mi)}return ms}return mi.MessageOf(x)
}// Deprecated: Use PersonRequest.ProtoReflect.Descriptor instead.
func (*PersonRequest) Descriptor() ([]byte, []int) {return file_person_proto_rawDescGZIP(), []int{2}
}func (x *PersonRequest) GetPersonId() int32 {if x != nil {return x.PersonId}return 0
}type PersonResponse struct {state protoimpl.MessageState `protogen:"open.v1"`Person *Person `protobuf:"bytes,1,opt,name=person,proto3" json:"person,omitempty"`unknownFields protoimpl.UnknownFieldssizeCache protoimpl.SizeCache
}func (x *PersonResponse) Reset() {*x = PersonResponse{}mi := &file_person_proto_msgTypes[3]ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))ms.StoreMessageInfo(mi)
}func (x *PersonResponse) String() string {return protoimpl.X.MessageStringOf(x)
}func (*PersonResponse) ProtoMessage() {}func (x *PersonResponse) ProtoReflect() protoreflect.Message {mi := &file_person_proto_msgTypes[3]if x != nil {ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))if ms.LoadMessageInfo() == nil {ms.StoreMessageInfo(mi)}return ms}return mi.MessageOf(x)
}// Deprecated: Use PersonResponse.ProtoReflect.Descriptor instead.
func (*PersonResponse) Descriptor() ([]byte, []int) {return file_person_proto_rawDescGZIP(), []int{3}
}func (x *PersonResponse) GetPerson() *Person {if x != nil {return x.Person}return nil
}var File_person_proto protoreflect.FileDescriptorconst file_person_proto_rawDesc = "" +"\n" +"\fperson.proto\x12\aexample\"P\n" +"\aAddress\x12\x12\n" +"\x04city\x18\x01 \x01(\tR\x04city\x12\x16\n" +"\x06street\x18\x02 \x01(\tR\x06street\x12\x19\n" +"\bzip_code\x18\x03 \x01(\x05R\azipCode\"\x9d\x01\n" +"\x06Person\x12\x12\n" +"\x04name\x18\x01 \x01(\tR\x04name\x12\x10\n" +"\x03age\x18\x02 \x01(\x05R\x03age\x12'\n" +"\x06gender\x18\x03 \x01(\x0e2\x0f.example.GenderR\x06gender\x12\x18\n" +"\ahobbies\x18\x04 \x03(\tR\ahobbies\x12*\n" +"\aaddress\x18\x05 \x01(\v2\x10.example.AddressR\aaddress\",\n" +"\rPersonRequest\x12\x1b\n" +"\tperson_id\x18\x01 \x01(\x05R\bpersonId\"9\n" +"\x0ePersonResponse\x12'\n" +"\x06person\x18\x01 \x01(\v2\x0f.example.PersonR\x06person*+\n" +"\x06Gender\x12\v\n" +"\aUNKNOWN\x10\x00\x12\b\n" +"\x04MALE\x10\x01\x12\n" +"\n" +"\x06FEMALE\x10\x022Q\n" +"\rPersonService\x12@\n" +"\rGetPersonInfo\x12\x16.example.PersonRequest\x1a\x17.example.PersonResponseB1Z/github.com/yourusername/protobuf-example/personb\x06proto3"var (file_person_proto_rawDescOnce sync.Oncefile_person_proto_rawDescData []byte
)func file_person_proto_rawDescGZIP() []byte {file_person_proto_rawDescOnce.Do(func() {file_person_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_person_proto_rawDesc), len(file_person_proto_rawDesc)))})return file_person_proto_rawDescData
}var file_person_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_person_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_person_proto_goTypes = []any{(Gender)(0), // 0: example.Gender(*Address)(nil), // 1: example.Address(*Person)(nil), // 2: example.Person(*PersonRequest)(nil), // 3: example.PersonRequest(*PersonResponse)(nil), // 4: example.PersonResponse
}
var file_person_proto_depIdxs = []int32{0, // 0: example.Person.gender:type_name -> example.Gender1, // 1: example.Person.address:type_name -> example.Address2, // 2: example.PersonResponse.person:type_name -> example.Person3, // 3: example.PersonService.GetPersonInfo:input_type -> example.PersonRequest4, // 4: example.PersonService.GetPersonInfo:output_type -> example.PersonResponse4, // [4:5] is the sub-list for method output_type3, // [3:4] is the sub-list for method input_type3, // [3:3] is the sub-list for extension type_name3, // [3:3] is the sub-list for extension extendee0, // [0:3] is the sub-list for field type_name
}func init() { file_person_proto_init() }
func file_person_proto_init() {if File_person_proto != nil {return}type x struct{}out := protoimpl.TypeBuilder{File: protoimpl.DescBuilder{GoPackagePath: reflect.TypeOf(x{}).PkgPath(),RawDescriptor: unsafe.Slice(unsafe.StringData(file_person_proto_rawDesc), len(file_person_proto_rawDesc)),NumEnums: 1,NumMessages: 4,NumExtensions: 0,NumServices: 1,},GoTypes: file_person_proto_goTypes,DependencyIndexes: file_person_proto_depIdxs,EnumInfos: file_person_proto_enumTypes,MessageInfos: file_person_proto_msgTypes,}.Build()File_person_proto = out.Filefile_person_proto_goTypes = nilfile_person_proto_depIdxs = nil
}
3.4.3.生成 go-grpc 文件 _grpc.pb.go
# 运行 protoc 命令,生成 go-grpc 文件
protoc \--go-grpc_out=./person \--go-grpc_opt=paths=source_relative \person.proto# 如果报错有下面报错,说明protoc-gen-go没有加入环境变量,处理一下就好
# protoc-gen-go: program not found or is not executable
# Please specify a program using absolute path or make sure the program is available in your PATH system variable
# --go_out: protoc-gen-go: Plugin failed with status code 1.
export PATH=$PATH:$HOME/go/bin
- 生成结果:
- 文件:
person/person_grpc.pb.go
- 内容包含:
PersonServiceClient
、PersonServiceServer
等 客户端-服务端 Interface,用于外部使用。- 自动生成
PersonServiceClient
、PersonServiceServer
接口的实现,比如UnimplementedPersonServiceServer,不过server我们一般都会在自行编写,不会直接使用 - 自动生成 将本grpc server 注册到 grpc内部注册中心的方法,即:
RegisterPersonServiceServer
,后面编写服务端时会用到
- 文件:
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.5.1
// - protoc v5.29.3
// source: person.protopackage personimport (context "context"grpc "google.golang.org/grpc"codes "google.golang.org/grpc/codes"status "google.golang.org/grpc/status"
)// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9const (PersonService_GetPersonInfo_FullMethodName = "/example.PersonService/GetPersonInfo"
)// PersonServiceClient is the client API for PersonService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
//
// 可选:RPC 服务定义(如果需要)
type PersonServiceClient interface {GetPersonInfo(ctx context.Context, in *PersonRequest, opts ...grpc.CallOption) (*PersonResponse, error)
}type personServiceClient struct {cc grpc.ClientConnInterface
}func NewPersonServiceClient(cc grpc.ClientConnInterface) PersonServiceClient {return &personServiceClient{cc}
}func (c *personServiceClient) GetPersonInfo(ctx context.Context, in *PersonRequest, opts ...grpc.CallOption) (*PersonResponse, error) {cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)out := new(PersonResponse)err := c.cc.Invoke(ctx, PersonService_GetPersonInfo_FullMethodName, in, out, cOpts...)if err != nil {return nil, err}return out, nil
}// PersonServiceServer is the server API for PersonService service.
// All implementations must embed UnimplementedPersonServiceServer
// for forward compatibility.
//
// 可选:RPC 服务定义(如果需要)
type PersonServiceServer interface {GetPersonInfo(context.Context, *PersonRequest) (*PersonResponse, error)mustEmbedUnimplementedPersonServiceServer()
}// UnimplementedPersonServiceServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedPersonServiceServer struct{}func (UnimplementedPersonServiceServer) GetPersonInfo(context.Context, *PersonRequest) (*PersonResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method GetPersonInfo not implemented")
}
func (UnimplementedPersonServiceServer) mustEmbedUnimplementedPersonServiceServer() {}
func (UnimplementedPersonServiceServer) testEmbeddedByValue() {}// UnsafePersonServiceServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to PersonServiceServer will
// result in compilation errors.
type UnsafePersonServiceServer interface {mustEmbedUnimplementedPersonServiceServer()
}func RegisterPersonServiceServer(s grpc.ServiceRegistrar, srv PersonServiceServer) {// If the following call pancis, it indicates UnimplementedPersonServiceServer was// embedded by pointer and is nil. This will cause panics if an// unimplemented method is ever invoked, so we test this at initialization// time to prevent it from happening at runtime later due to I/O.if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {t.testEmbeddedByValue()}s.RegisterService(&PersonService_ServiceDesc, srv)
}func _PersonService_GetPersonInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {in := new(PersonRequest)if err := dec(in); err != nil {return nil, err}if interceptor == nil {return srv.(PersonServiceServer).GetPersonInfo(ctx, in)}info := &grpc.UnaryServerInfo{Server: srv,FullMethod: PersonService_GetPersonInfo_FullMethodName,}handler := func(ctx context.Context, req interface{}) (interface{}, error) {return srv.(PersonServiceServer).GetPersonInfo(ctx, req.(*PersonRequest))}return interceptor(ctx, in, info, handler)
}// PersonService_ServiceDesc is the grpc.ServiceDesc for PersonService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var PersonService_ServiceDesc = grpc.ServiceDesc{ServiceName: "example.PersonService",HandlerType: (*PersonServiceServer)(nil),Methods: []grpc.MethodDesc{{MethodName: "GetPersonInfo",Handler: _PersonService_GetPersonInfo_Handler,},},Streams: []grpc.StreamDesc{},Metadata: "person.proto",
}
4.基于gRPC的客户端与服务端编写
项目目录:
.
├── client
│ └── client.go
├── proto
│ ├── person
│ │ ├── person.pb.go
│ │ └── person_grpc.pb.go
│ └── person.proto
└── server└── server.go
4.1.gRPC服务端编写
4.1.1.服务端逻辑
- 创建 gRPC Server 对象,即服务端抽象核心
- 将server对象注册到 gRPC Server 的内部注册中心,这样可以在接收到请求时,通过内部服务发现,发现服务端接口并转接进行逻辑处理
- 创建 Listen 监听 TCP 端口 → 建立网络通信通道
- 启动 gRPC Server 接受请求 → 持续处理客户端连接
4.1.2.具体代码实现
package mainimport ("context""fmt"pb "test-project/rpc/proto/person""google.golang.org/grpc""net"
)// 自行实现 grpc 的server端,没有直接使用 _grpc.pb.g 文件自动生成的server
type server struct {pb.UnimplementedPersonServiceServer
}func (s *server) GetPersonInfo(ctx context.Context, req *pb.PersonRequest) (*pb.PersonResponse, error) {fmt.Println(fmt.Sprintf("get person info by id: %v", req.GetPersonId()))return &pb.PersonResponse{Person: &pb.Person{Name: "graham",Age: 18,Gender: pb.Gender_MALE,Hobbies: nil,Address: &pb.Address{City: "ShangHai",Street: "waitan",ZipCode: 1,},},}, nil
}func main() {// 开启端口listen, _ := net.Listen("tcp", ":8090")// 创建grpc服务,这里启动的服务没有开启安全认证,生产使用要加一些认证grpcServer := grpc.NewServer()// 在grpc服务端 注册我们自己编写的服务pb.RegisterPersonServiceServer(grpcServer, &server{})// 启动grpc服务if err := grpcServer.Serve(listen); err != nil {fmt.Printf("failed to server: %v", err)return}
}
4.2.gRPC客户端编写
4.2.1.客户端逻辑
- 创建于给定目标(服务端)的连接交互
- 创建 server的客户端对象
- 发送rpc请求,等到同步响应,得到回调后的响应结果
- 输出响应结果
4.2.2.具体代码实现
package mainimport ("context""fmt"pb "test-project/rpc/proto/person""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure""os"
)func main() {// 创建一个到 grpc server 的连接,这里先使用了无加密和验证的连接,生产使用要加一些认证conn, err := grpc.Dial("127.0.0.1:8090", grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {fmt.Printf("did not connect: %v", err)os.Exit(1)}defer func() {if err := conn.Close(); err != nil {fmt.Printf("did not close connect: %v", err)return}}()// 使用连接创建一个客户端对象【由自动生成的 *_grpc.pb.go 提供】client := pb.NewPersonServiceClient(conn)// 使用客户端,调用GetPersonInfo grpc方法resp, err := client.GetPersonInfo(context.Background(), &pb.PersonRequest{PersonId: 1})if err != nil {fmt.Printf("call grpc func GetPersonInfo failed: %v", err)return}fmt.Println(resp)}
5.CRI gRPC实现
前面学习完了grpc的基础知识,我们应该可以看懂CRI的基本实现
5.1.1.cri proto文件
- github地址:https://github.com/kubernetes/cri-api
- 路径:
staging/src/k8s.io/cri-api/pkg/apis/runtime/v1/api.proto
- 文件内容较多,这里只贴出两个service服务接口:RuntimeService、ImageService
- 可以看出,里面定义了很多rpc方法,供具体的服务端实现
- kubelet 作为client调用接口,containerd作为服务端实现接口
......
// Runtime service defines the public APIs for remote container runtimes
service RuntimeService {// Version returns the runtime name, runtime version, and runtime API version.rpc Version(VersionRequest) returns (VersionResponse) {}// RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure// the sandbox is in the ready state on success.rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {}// StopPodSandbox stops any running process that is part of the sandbox and// reclaims network resources (e.g., IP addresses) allocated to the sandbox.// If there are any running containers in the sandbox, they must be forcibly// terminated.// This call is idempotent, and must not return an error if all relevant// resources have already been reclaimed. kubelet will call StopPodSandbox// at least once before calling RemovePodSandbox. It will also attempt to// reclaim resources eagerly, as soon as a sandbox is not needed. Hence,// multiple StopPodSandbox calls are expected.rpc StopPodSandbox(StopPodSandboxRequest) returns (StopPodSandboxResponse) {}// RemovePodSandbox removes the sandbox. If there are any running containers// in the sandbox, they must be forcibly terminated and removed.// This call is idempotent, and must not return an error if the sandbox has// already been removed.rpc RemovePodSandbox(RemovePodSandboxRequest) returns (RemovePodSandboxResponse) {}// PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not// present, returns an error.rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {}// ListPodSandbox returns a list of PodSandboxes.rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {}// CreateContainer creates a new container in specified PodSandboxrpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {}// StartContainer starts the container.rpc StartContainer(StartContainerRequest) returns (StartContainerResponse) {}// StopContainer stops a running container with a grace period (i.e., timeout).// This call is idempotent, and must not return an error if the container has// already been stopped.// The runtime must forcibly kill the container after the grace period is// reached.rpc StopContainer(StopContainerRequest) returns (StopContainerResponse) {}// RemoveContainer removes the container. If the container is running, the// container must be forcibly removed.// This call is idempotent, and must not return an error if the container has// already been removed.rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {}// ListContainers lists all containers by filters.rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {}// ContainerStatus returns status of the container. If the container is not// present, returns an error.rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {}// UpdateContainerResources updates ContainerConfig of the container synchronously.// If runtime fails to transactionally update the requested resources, an error is returned.rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {}// ReopenContainerLog asks runtime to reopen the stdout/stderr log file// for the container. This is often called after the log file has been// rotated. If the container is not running, container runtime can choose// to either create a new log file and return nil, or return an error.// Once it returns error, new container log file MUST NOT be created.rpc ReopenContainerLog(ReopenContainerLogRequest) returns (ReopenContainerLogResponse) {}// ExecSync runs a command in a container synchronously.rpc ExecSync(ExecSyncRequest) returns (ExecSyncResponse) {}// Exec prepares a streaming endpoint to execute a command in the container.rpc Exec(ExecRequest) returns (ExecResponse) {}// Attach prepares a streaming endpoint to attach to a running container.rpc Attach(AttachRequest) returns (AttachResponse) {}// PortForward prepares a streaming endpoint to forward ports from a PodSandbox.rpc PortForward(PortForwardRequest) returns (PortForwardResponse) {}// ContainerStats returns stats of the container. If the container does not// exist, the call returns an error.rpc ContainerStats(ContainerStatsRequest) returns (ContainerStatsResponse) {}// ListContainerStats returns stats of all running containers.rpc ListContainerStats(ListContainerStatsRequest) returns (ListContainerStatsResponse) {}// PodSandboxStats returns stats of the pod sandbox. If the pod sandbox does not// exist, the call returns an error.rpc PodSandboxStats(PodSandboxStatsRequest) returns (PodSandboxStatsResponse) {}// ListPodSandboxStats returns stats of the pod sandboxes matching a filter.rpc ListPodSandboxStats(ListPodSandboxStatsRequest) returns (ListPodSandboxStatsResponse) {}// UpdateRuntimeConfig updates the runtime configuration based on the given request.rpc UpdateRuntimeConfig(UpdateRuntimeConfigRequest) returns (UpdateRuntimeConfigResponse) {}// Status returns the status of the runtime.rpc Status(StatusRequest) returns (StatusResponse) {}// CheckpointContainer checkpoints a containerrpc CheckpointContainer(CheckpointContainerRequest) returns (CheckpointContainerResponse) {}// GetContainerEvents gets container events from the CRI runtimerpc GetContainerEvents(GetEventsRequest) returns (stream ContainerEventResponse) {}// ListMetricDescriptors gets the descriptors for the metrics that will be returned in ListPodSandboxMetrics.// This list should be static at startup: either the client and server restart together when// adding or removing metrics descriptors, or they should not change.// Put differently, if ListPodSandboxMetrics references a name that is not described in the initial// ListMetricDescriptors call, then the metric will not be broadcasted.rpc ListMetricDescriptors(ListMetricDescriptorsRequest) returns (ListMetricDescriptorsResponse) {}// ListPodSandboxMetrics gets pod sandbox metrics from CRI Runtimerpc ListPodSandboxMetrics(ListPodSandboxMetricsRequest) returns (ListPodSandboxMetricsResponse) {}// RuntimeConfig returns configuration information of the runtime.// A couple of notes:// - The RuntimeConfigRequest object is not to be confused with the contents of UpdateRuntimeConfigRequest.// The former is for having runtime tell Kubelet what to do, the latter vice versa.// - It is the expectation of the Kubelet that these fields are static for the lifecycle of the Kubelet.// The Kubelet will not re-request the RuntimeConfiguration after startup, and CRI implementations should// avoid updating them without a full node reboot.rpc RuntimeConfig(RuntimeConfigRequest) returns (RuntimeConfigResponse) {}
}// ImageService defines the public APIs for managing images.
service ImageService {// ListImages lists existing images.rpc ListImages(ListImagesRequest) returns (ListImagesResponse) {}// ImageStatus returns the status of the image. If the image is not// present, returns a response with ImageStatusResponse.Image set to// nil.rpc ImageStatus(ImageStatusRequest) returns (ImageStatusResponse) {}// PullImage pulls an image with authentication config.rpc PullImage(PullImageRequest) returns (PullImageResponse) {}// RemoveImage removes the image.// This call is idempotent, and must not return an error if the image has// already been removed.rpc RemoveImage(RemoveImageRequest) returns (RemoveImageResponse) {}// ImageFSInfo returns information of the filesystem that is used to store images.rpc ImageFsInfo(ImageFsInfoRequest) returns (ImageFsInfoResponse) {}
}
......
5.1.2.cri pb.go 与 _grpc.pb.go 文件
- cri-api项目中,将我们在3.4中演示的 pb.go 与 _grpc.pb.go 文件内容,生成到了一个文件中:
staging/src/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go
- 该文件包含 proto定义的所有对象结构go结构、所有service的go结构、client接口及对象、server接口及对象
......// RuntimeServiceClient is the client API for RuntimeService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type RuntimeServiceClient interface {// Version returns the runtime name, runtime version, and runtime API version.Version(ctx context.Context, in *VersionRequest, opts ...grpc.CallOption) (*VersionResponse, error)// RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure// the sandbox is in the ready state on success.RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error)// StopPodSandbox stops any running process that is part of the sandbox and// reclaims network resources (e.g., IP addresses) allocated to the sandbox.// If there are any running containers in the sandbox, they must be forcibly// terminated.// This call is idempotent, and must not return an error if all relevant// resources have already been reclaimed. kubelet will call StopPodSandbox// at least once before calling RemovePodSandbox. It will also attempt to// reclaim resources eagerly, as soon as a sandbox is not needed. Hence,// multiple StopPodSandbox calls are expected.StopPodSandbox(ctx context.Context, in *StopPodSandboxRequest, opts ...grpc.CallOption) (*StopPodSandboxResponse, error)// RemovePodSandbox removes the sandbox. If there are any running containers// in the sandbox, they must be forcibly terminated and removed.// This call is idempotent, and must not return an error if the sandbox has// already been removed.RemovePodSandbox(ctx context.Context, in *RemovePodSandboxRequest, opts ...grpc.CallOption) (*RemovePodSandboxResponse, error)// PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not// present, returns an error.PodSandboxStatus(ctx context.Context, in *PodSandboxStatusRequest, opts ...grpc.CallOption) (*PodSandboxStatusResponse, error)// ListPodSandbox returns a list of PodSandboxes.ListPodSandbox(ctx context.Context, in *ListPodSandboxRequest, opts ...grpc.CallOption) (*ListPodSandboxResponse, error)// CreateContainer creates a new container in specified PodSandboxCreateContainer(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error)// StartContainer starts the container.StartContainer(ctx context.Context, in *StartContainerRequest, opts ...grpc.CallOption) (*StartContainerResponse, error)// StopContainer stops a running container with a grace period (i.e., timeout).// This call is idempotent, and must not return an error if the container has// already been stopped.// The runtime must forcibly kill the container after the grace period is// reached.StopContainer(ctx context.Context, in *StopContainerRequest, opts ...grpc.CallOption) (*StopContainerResponse, error)// RemoveContainer removes the container. If the container is running, the// container must be forcibly removed.// This call is idempotent, and must not return an error if the container has// already been removed.RemoveContainer(ctx context.Context, in *RemoveContainerRequest, opts ...grpc.CallOption) (*RemoveContainerResponse, error)// ListContainers lists all containers by filters.ListContainers(ctx context.Context, in *ListContainersRequest, opts ...grpc.CallOption) (*ListContainersResponse, error)// ContainerStatus returns status of the container. If the container is not// present, returns an error.ContainerStatus(ctx context.Context, in *ContainerStatusRequest, opts ...grpc.CallOption) (*ContainerStatusResponse, error)// UpdateContainerResources updates ContainerConfig of the container synchronously.// If runtime fails to transactionally update the requested resources, an error is returned.UpdateContainerResources(ctx context.Context, in *UpdateContainerResourcesRequest, opts ...grpc.CallOption) (*UpdateContainerResourcesResponse, error)// ReopenContainerLog asks runtime to reopen the stdout/stderr log file// for the container. This is often called after the log file has been// rotated. If the container is not running, container runtime can choose// to either create a new log file and return nil, or return an error.// Once it returns error, new container log file MUST NOT be created.ReopenContainerLog(ctx context.Context, in *ReopenContainerLogRequest, opts ...grpc.CallOption) (*ReopenContainerLogResponse, error)// ExecSync runs a command in a container synchronously.ExecSync(ctx context.Context, in *ExecSyncRequest, opts ...grpc.CallOption) (*ExecSyncResponse, error)// Exec prepares a streaming endpoint to execute a command in the container.Exec(ctx context.Context, in *ExecRequest, opts ...grpc.CallOption) (*ExecResponse, error)// Attach prepares a streaming endpoint to attach to a running container.Attach(ctx context.Context, in *AttachRequest, opts ...grpc.CallOption) (*AttachResponse, error)// PortForward prepares a streaming endpoint to forward ports from a PodSandbox.PortForward(ctx context.Context, in *PortForwardRequest, opts ...grpc.CallOption) (*PortForwardResponse, error)// ContainerStats returns stats of the container. If the container does not// exist, the call returns an error.ContainerStats(ctx context.Context, in *ContainerStatsRequest, opts ...grpc.CallOption) (*ContainerStatsResponse, error)// ListContainerStats returns stats of all running containers.ListContainerStats(ctx context.Context, in *ListContainerStatsRequest, opts ...grpc.CallOption) (*ListContainerStatsResponse, error)// PodSandboxStats returns stats of the pod sandbox. If the pod sandbox does not// exist, the call returns an error.PodSandboxStats(ctx context.Context, in *PodSandboxStatsRequest, opts ...grpc.CallOption) (*PodSandboxStatsResponse, error)// ListPodSandboxStats returns stats of the pod sandboxes matching a filter.ListPodSandboxStats(ctx context.Context, in *ListPodSandboxStatsRequest, opts ...grpc.CallOption) (*ListPodSandboxStatsResponse, error)// UpdateRuntimeConfig updates the runtime configuration based on the given request.UpdateRuntimeConfig(ctx context.Context, in *UpdateRuntimeConfigRequest, opts ...grpc.CallOption) (*UpdateRuntimeConfigResponse, error)// Status returns the status of the runtime.Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error)// CheckpointContainer checkpoints a containerCheckpointContainer(ctx context.Context, in *CheckpointContainerRequest, opts ...grpc.CallOption) (*CheckpointContainerResponse, error)// GetContainerEvents gets container events from the CRI runtimeGetContainerEvents(ctx context.Context, in *GetEventsRequest, opts ...grpc.CallOption) (RuntimeService_GetContainerEventsClient, error)// ListMetricDescriptors gets the descriptors for the metrics that will be returned in ListPodSandboxMetrics.// This list should be static at startup: either the client and server restart together when// adding or removing metrics descriptors, or they should not change.// Put differently, if ListPodSandboxMetrics references a name that is not described in the initial// ListMetricDescriptors call, then the metric will not be broadcasted.ListMetricDescriptors(ctx context.Context, in *ListMetricDescriptorsRequest, opts ...grpc.CallOption) (*ListMetricDescriptorsResponse, error)// ListPodSandboxMetrics gets pod sandbox metrics from CRI RuntimeListPodSandboxMetrics(ctx context.Context, in *ListPodSandboxMetricsRequest, opts ...grpc.CallOption) (*ListPodSandboxMetricsResponse, error)// RuntimeConfig returns configuration information of the runtime.// A couple of notes:// - The RuntimeConfigRequest object is not to be confused with the contents of UpdateRuntimeConfigRequest.// The former is for having runtime tell Kubelet what to do, the latter vice versa.// - It is the expectation of the Kubelet that these fields are static for the lifecycle of the Kubelet.// The Kubelet will not re-request the RuntimeConfiguration after startup, and CRI implementations should// avoid updating them without a full node reboot.RuntimeConfig(ctx context.Context, in *RuntimeConfigRequest, opts ...grpc.CallOption) (*RuntimeConfigResponse, error)
}type runtimeServiceClient struct {cc *grpc.ClientConn
}func NewRuntimeServiceClient(cc *grpc.ClientConn) RuntimeServiceClient {return &runtimeServiceClient{cc}
}func (c *runtimeServiceClient) Version(ctx context.Context, in *VersionRequest, opts ...grpc.CallOption) (*VersionResponse, error) {out := new(VersionResponse)err := c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/Version", in, out, opts...)if err != nil {return nil, err}return out, nil
}func (c *runtimeServiceClient) RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error) {out := new(RunPodSandboxResponse)err := c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/RunPodSandbox", in, out, opts...)if err != nil {return nil, err}return out, nil
}// RuntimeServiceServer is the server API for RuntimeService service.
type RuntimeServiceServer interface {// Version returns the runtime name, runtime version, and runtime API version.Version(context.Context, *VersionRequest) (*VersionResponse, error)// RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure// the sandbox is in the ready state on success.RunPodSandbox(context.Context, *RunPodSandboxRequest) (*RunPodSandboxResponse, error)// StopPodSandbox stops any running process that is part of the sandbox and// reclaims network resources (e.g., IP addresses) allocated to the sandbox.// If there are any running containers in the sandbox, they must be forcibly// terminated.// This call is idempotent, and must not return an error if all relevant// resources have already been reclaimed. kubelet will call StopPodSandbox// at least once before calling RemovePodSandbox. It will also attempt to// reclaim resources eagerly, as soon as a sandbox is not needed. Hence,// multiple StopPodSandbox calls are expected.StopPodSandbox(context.Context, *StopPodSandboxRequest) (*StopPodSandboxResponse, error)// RemovePodSandbox removes the sandbox. If there are any running containers// in the sandbox, they must be forcibly terminated and removed.// This call is idempotent, and must not return an error if the sandbox has// already been removed.RemovePodSandbox(context.Context, *RemovePodSandboxRequest) (*RemovePodSandboxResponse, error)// PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not// present, returns an error.PodSandboxStatus(context.Context, *PodSandboxStatusRequest) (*PodSandboxStatusResponse, error)// ListPodSandbox returns a list of PodSandboxes.ListPodSandbox(context.Context, *ListPodSandboxRequest) (*ListPodSandboxResponse, error)// CreateContainer creates a new container in specified PodSandboxCreateContainer(context.Context, *CreateContainerRequest) (*CreateContainerResponse, error)// StartContainer starts the container.StartContainer(context.Context, *StartContainerRequest) (*StartContainerResponse, error)// StopContainer stops a running container with a grace period (i.e., timeout).// This call is idempotent, and must not return an error if the container has// already been stopped.// The runtime must forcibly kill the container after the grace period is// reached.StopContainer(context.Context, *StopContainerRequest) (*StopContainerResponse, error)// RemoveContainer removes the container. If the container is running, the// container must be forcibly removed.// This call is idempotent, and must not return an error if the container has// already been removed.RemoveContainer(context.Context, *RemoveContainerRequest) (*RemoveContainerResponse, error)// ListContainers lists all containers by filters.ListContainers(context.Context, *ListContainersRequest) (*ListContainersResponse, error)// ContainerStatus returns status of the container. If the container is not// present, returns an error.ContainerStatus(context.Context, *ContainerStatusRequest) (*ContainerStatusResponse, error)// UpdateContainerResources updates ContainerConfig of the container synchronously.// If runtime fails to transactionally update the requested resources, an error is returned.UpdateContainerResources(context.Context, *UpdateContainerResourcesRequest) (*UpdateContainerResourcesResponse, error)// ReopenContainerLog asks runtime to reopen the stdout/stderr log file// for the container. This is often called after the log file has been// rotated. If the container is not running, container runtime can choose// to either create a new log file and return nil, or return an error.// Once it returns error, new container log file MUST NOT be created.ReopenContainerLog(context.Context, *ReopenContainerLogRequest) (*ReopenContainerLogResponse, error)// ExecSync runs a command in a container synchronously.ExecSync(context.Context, *ExecSyncRequest) (*ExecSyncResponse, error)// Exec prepares a streaming endpoint to execute a command in the container.Exec(context.Context, *ExecRequest) (*ExecResponse, error)// Attach prepares a streaming endpoint to attach to a running container.Attach(context.Context, *AttachRequest) (*AttachResponse, error)// PortForward prepares a streaming endpoint to forward ports from a PodSandbox.PortForward(context.Context, *PortForwardRequest) (*PortForwardResponse, error)// ContainerStats returns stats of the container. If the container does not// exist, the call returns an error.ContainerStats(context.Context, *ContainerStatsRequest) (*ContainerStatsResponse, error)// ListContainerStats returns stats of all running containers.ListContainerStats(context.Context, *ListContainerStatsRequest) (*ListContainerStatsResponse, error)// PodSandboxStats returns stats of the pod sandbox. If the pod sandbox does not// exist, the call returns an error.PodSandboxStats(context.Context, *PodSandboxStatsRequest) (*PodSandboxStatsResponse, error)// ListPodSandboxStats returns stats of the pod sandboxes matching a filter.ListPodSandboxStats(context.Context, *ListPodSandboxStatsRequest) (*ListPodSandboxStatsResponse, error)// UpdateRuntimeConfig updates the runtime configuration based on the given request.UpdateRuntimeConfig(context.Context, *UpdateRuntimeConfigRequest) (*UpdateRuntimeConfigResponse, error)// Status returns the status of the runtime.Status(context.Context, *StatusRequest) (*StatusResponse, error)// CheckpointContainer checkpoints a containerCheckpointContainer(context.Context, *CheckpointContainerRequest) (*CheckpointContainerResponse, error)// GetContainerEvents gets container events from the CRI runtimeGetContainerEvents(*GetEventsRequest, RuntimeService_GetContainerEventsServer) error// ListMetricDescriptors gets the descriptors for the metrics that will be returned in ListPodSandboxMetrics.// This list should be static at startup: either the client and server restart together when// adding or removing metrics descriptors, or they should not change.// Put differently, if ListPodSandboxMetrics references a name that is not described in the initial// ListMetricDescriptors call, then the metric will not be broadcasted.ListMetricDescriptors(context.Context, *ListMetricDescriptorsRequest) (*ListMetricDescriptorsResponse, error)// ListPodSandboxMetrics gets pod sandbox metrics from CRI RuntimeListPodSandboxMetrics(context.Context, *ListPodSandboxMetricsRequest) (*ListPodSandboxMetricsResponse, error)// RuntimeConfig returns configuration information of the runtime.// A couple of notes:// - The RuntimeConfigRequest object is not to be confused with the contents of UpdateRuntimeConfigRequest.// The former is for having runtime tell Kubelet what to do, the latter vice versa.// - It is the expectation of the Kubelet that these fields are static for the lifecycle of the Kubelet.// The Kubelet will not re-request the RuntimeConfiguration after startup, and CRI implementations should// avoid updating them without a full node reboot.RuntimeConfig(context.Context, *RuntimeConfigRequest) (*RuntimeConfigResponse, error)
}// UnimplementedRuntimeServiceServer can be embedded to have forward compatible implementations.
type UnimplementedRuntimeServiceServer struct {
}func (*UnimplementedRuntimeServiceServer) Version(ctx context.Context, req *VersionRequest) (*VersionResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method Version not implemented")
}
func (*UnimplementedRuntimeServiceServer) RunPodSandbox(ctx context.Context, req *RunPodSandboxRequest) (*RunPodSandboxResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method RunPodSandbox not implemented")
}
func (*UnimplementedRuntimeServiceServer) StopPodSandbox(ctx context.Context, req *StopPodSandboxRequest) (*StopPodSandboxResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method StopPodSandbox not implemented")
}
func (*UnimplementedRuntimeServiceServer) RemovePodSandbox(ctx context.Context, req *RemovePodSandboxRequest) (*RemovePodSandboxResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method RemovePodSandbox not implemented")
}
func (*UnimplementedRuntimeServiceServer) PodSandboxStatus(ctx context.Context, req *PodSandboxStatusRequest) (*PodSandboxStatusResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method PodSandboxStatus not implemented")
}
func (*UnimplementedRuntimeServiceServer) ListPodSandbox(ctx context.Context, req *ListPodSandboxRequest) (*ListPodSandboxResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ListPodSandbox not implemented")
}
func (*UnimplementedRuntimeServiceServer) CreateContainer(ctx context.Context, req *CreateContainerRequest) (*CreateContainerResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method CreateContainer not implemented")
}
func (*UnimplementedRuntimeServiceServer) StartContainer(ctx context.Context, req *StartContainerRequest) (*StartContainerResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method StartContainer not implemented")
}
func (*UnimplementedRuntimeServiceServer) StopContainer(ctx context.Context, req *StopContainerRequest) (*StopContainerResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method StopContainer not implemented")
}
func (*UnimplementedRuntimeServiceServer) RemoveContainer(ctx context.Context, req *RemoveContainerRequest) (*RemoveContainerResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method RemoveContainer not implemented")
}
func (*UnimplementedRuntimeServiceServer) ListContainers(ctx context.Context, req *ListContainersRequest) (*ListContainersResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ListContainers not implemented")
}
func (*UnimplementedRuntimeServiceServer) ContainerStatus(ctx context.Context, req *ContainerStatusRequest) (*ContainerStatusResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ContainerStatus not implemented")
}
func (*UnimplementedRuntimeServiceServer) UpdateContainerResources(ctx context.Context, req *UpdateContainerResourcesRequest) (*UpdateContainerResourcesResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method UpdateContainerResources not implemented")
}
func (*UnimplementedRuntimeServiceServer) ReopenContainerLog(ctx context.Context, req *ReopenContainerLogRequest) (*ReopenContainerLogResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ReopenContainerLog not implemented")
}
func (*UnimplementedRuntimeServiceServer) ExecSync(ctx context.Context, req *ExecSyncRequest) (*ExecSyncResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ExecSync not implemented")
}
func (*UnimplementedRuntimeServiceServer) Exec(ctx context.Context, req *ExecRequest) (*ExecResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method Exec not implemented")
}
func (*UnimplementedRuntimeServiceServer) Attach(ctx context.Context, req *AttachRequest) (*AttachResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method Attach not implemented")
}
func (*UnimplementedRuntimeServiceServer) PortForward(ctx context.Context, req *PortForwardRequest) (*PortForwardResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method PortForward not implemented")
}
func (*UnimplementedRuntimeServiceServer) ContainerStats(ctx context.Context, req *ContainerStatsRequest) (*ContainerStatsResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ContainerStats not implemented")
}
func (*UnimplementedRuntimeServiceServer) ListContainerStats(ctx context.Context, req *ListContainerStatsRequest) (*ListContainerStatsResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ListContainerStats not implemented")
}
func (*UnimplementedRuntimeServiceServer) PodSandboxStats(ctx context.Context, req *PodSandboxStatsRequest) (*PodSandboxStatsResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method PodSandboxStats not implemented")
}
func (*UnimplementedRuntimeServiceServer) ListPodSandboxStats(ctx context.Context, req *ListPodSandboxStatsRequest) (*ListPodSandboxStatsResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ListPodSandboxStats not implemented")
}
func (*UnimplementedRuntimeServiceServer) UpdateRuntimeConfig(ctx context.Context, req *UpdateRuntimeConfigRequest) (*UpdateRuntimeConfigResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method UpdateRuntimeConfig not implemented")
}
func (*UnimplementedRuntimeServiceServer) Status(ctx context.Context, req *StatusRequest) (*StatusResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method Status not implemented")
}
func (*UnimplementedRuntimeServiceServer) CheckpointContainer(ctx context.Context, req *CheckpointContainerRequest) (*CheckpointContainerResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method CheckpointContainer not implemented")
}
func (*UnimplementedRuntimeServiceServer) GetContainerEvents(req *GetEventsRequest, srv RuntimeService_GetContainerEventsServer) error {return status.Errorf(codes.Unimplemented, "method GetContainerEvents not implemented")
}
func (*UnimplementedRuntimeServiceServer) ListMetricDescriptors(ctx context.Context, req *ListMetricDescriptorsRequest) (*ListMetricDescriptorsResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ListMetricDescriptors not implemented")
}
func (*UnimplementedRuntimeServiceServer) ListPodSandboxMetrics(ctx context.Context, req *ListPodSandboxMetricsRequest) (*ListPodSandboxMetricsResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method ListPodSandboxMetrics not implemented")
}
func (*UnimplementedRuntimeServiceServer) RuntimeConfig(ctx context.Context, req *RuntimeConfigRequest) (*RuntimeConfigResponse, error) {return nil, status.Errorf(codes.Unimplemented, "method RuntimeConfig not implemented")
}func RegisterRuntimeServiceServer(s *grpc.Server, srv RuntimeServiceServer) {s.RegisterService(&_RuntimeService_serviceDesc, srv)
}
......
5.1.3.cri Manager文件
- 在cri项目中,除了上述文件,还有一个 services 文件:staging/src/k8s.io/cri-api/pkg/apis/services.go
- 该文件定义了
RuntimeService
、ImageManagerService
等接口,用于描述cri提供的具体方法- 客户端可以对这里面的方法自行实现,以更好的封装调用 grpc方法的操作
- 比如 cri-client 就通过实现
RuntimeService
、ImageManagerService
接口的所有方法,对grpc进行更好的封装。- 比如 RunPodSandbox 方法,cri-client实现 RuntimeService 接口,实现逻辑是:通过 .pb.go 中的client,调用grpc方法,实现sandbox的启动,并获取输出
/*
Copyright 2016 The Kubernetes Authors.Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/package criimport ("context""time"runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
)// RuntimeVersioner contains methods for runtime name, version and API version.
type RuntimeVersioner interface {// Version returns the runtime name, runtime version and runtime API versionVersion(ctx context.Context, apiVersion string) (*runtimeapi.VersionResponse, error)
}// ContainerManager contains methods to manipulate containers managed by a
// container runtime. The methods are thread-safe.
type ContainerManager interface {// CreateContainer creates a new container in specified PodSandbox.CreateContainer(ctx context.Context, podSandboxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error)// StartContainer starts the container.StartContainer(ctx context.Context, containerID string) error// StopContainer stops a running container with a grace period (i.e., timeout).StopContainer(ctx context.Context, containerID string, timeout int64) error// RemoveContainer removes the container.RemoveContainer(ctx context.Context, containerID string) error// ListContainers lists all containers by filters.ListContainers(ctx context.Context, filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error)// ContainerStatus returns the status of the container.ContainerStatus(ctx context.Context, containerID string, verbose bool) (*runtimeapi.ContainerStatusResponse, error)// UpdateContainerResources updates ContainerConfig of the container synchronously.// If runtime fails to transactionally update the requested resources, an error is returned.UpdateContainerResources(ctx context.Context, containerID string, resources *runtimeapi.ContainerResources) error// ExecSync executes a command in the container, and returns the stdout output.// If command exits with a non-zero exit code, an error is returned.ExecSync(ctx context.Context, containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error)// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.Exec(ctx context.Context, request *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error)// Attach prepares a streaming endpoint to attach to a running container, and returns the address.Attach(ctx context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error)// ReopenContainerLog asks runtime to reopen the stdout/stderr log file// for the container. If it returns error, new container log file MUST NOT// be created.ReopenContainerLog(ctx context.Context, ContainerID string) error// CheckpointContainer checkpoints a containerCheckpointContainer(ctx context.Context, options *runtimeapi.CheckpointContainerRequest) error// GetContainerEvents gets container events from the CRI runtimeGetContainerEvents(ctx context.Context, containerEventsCh chan *runtimeapi.ContainerEventResponse, connectionEstablishedCallback func(runtimeapi.RuntimeService_GetContainerEventsClient)) error
}// PodSandboxManager contains methods for operating on PodSandboxes. The methods
// are thread-safe.
type PodSandboxManager interface {// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure// the sandbox is in ready state.RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error)// StopPodSandbox stops the sandbox. If there are any running containers in the// sandbox, they should be force terminated.StopPodSandbox(pctx context.Context, odSandboxID string) error// RemovePodSandbox removes the sandbox. If there are running containers in the// sandbox, they should be forcibly removed.RemovePodSandbox(ctx context.Context, podSandboxID string) error// PodSandboxStatus returns the Status of the PodSandbox.PodSandboxStatus(ctx context.Context, podSandboxID string, verbose bool) (*runtimeapi.PodSandboxStatusResponse, error)// ListPodSandbox returns a list of Sandbox.ListPodSandbox(ctx context.Context, filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error)// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.PortForward(ctx context.Context, request *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error)
}// ContainerStatsManager contains methods for retrieving the container
// statistics.
type ContainerStatsManager interface {// ContainerStats returns stats of the container. If the container does not// exist, the call returns an error.ContainerStats(ctx context.Context, containerID string) (*runtimeapi.ContainerStats, error)// ListContainerStats returns stats of all running containers.ListContainerStats(ctx context.Context, filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error)// PodSandboxStats returns stats of the pod. If the pod does not// exist, the call returns an error.PodSandboxStats(ctx context.Context, podSandboxID string) (*runtimeapi.PodSandboxStats, error)// ListPodSandboxStats returns stats of all running pods.ListPodSandboxStats(ctx context.Context, filter *runtimeapi.PodSandboxStatsFilter) ([]*runtimeapi.PodSandboxStats, error)// ListMetricDescriptors gets the descriptors for the metrics that will be returned in ListPodSandboxMetrics.ListMetricDescriptors(ctx context.Context) ([]*runtimeapi.MetricDescriptor, error)// ListPodSandboxMetrics returns metrics of all running pods.ListPodSandboxMetrics(ctx context.Context) ([]*runtimeapi.PodSandboxMetrics, error)
}// RuntimeService interface should be implemented by a container runtime.
// The methods should be thread-safe.
type RuntimeService interface {RuntimeVersionerContainerManagerPodSandboxManagerContainerStatsManager// UpdateRuntimeConfig updates runtime configuration if specifiedUpdateRuntimeConfig(ctx context.Context, runtimeConfig *runtimeapi.RuntimeConfig) error// Status returns the status of the runtime.Status(ctx context.Context, verbose bool) (*runtimeapi.StatusResponse, error)// RuntimeConfig returns the configuration information of the runtime.RuntimeConfig(ctx context.Context) (*runtimeapi.RuntimeConfigResponse, error)
}// ImageManagerService interface should be implemented by a container image
// manager.
// The methods should be thread-safe.
type ImageManagerService interface {// ListImages lists the existing images.ListImages(ctx context.Context, filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error)// ImageStatus returns the status of the image.ImageStatus(ctx context.Context, image *runtimeapi.ImageSpec, verbose bool) (*runtimeapi.ImageStatusResponse, error)// PullImage pulls an image with the authentication config.PullImage(ctx context.Context, image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error)// RemoveImage removes the image.RemoveImage(ctx context.Context, image *runtimeapi.ImageSpec) error// ImageFsInfo returns information of the filesystem(s) used to store the read-only layers and the writeable layer.ImageFsInfo(ctx context.Context) (*runtimeapi.ImageFsInfoResponse, error)
}