用go从零构建写一个RPC(仿gRPC,tRPC)--- 版本1

希望借助手写这个go的中间件项目,能够理解go语言的特性以及用go写中间件的优势之处,同时也是为了更好的使用和优化公司用到的trpc,并且作者之前也使用过grpc并有一定的兴趣,所以打算从0构建一个rpc系统,对于生产环境已经投入使用的项目抽丝剥茧后,再从0构建,从而更好的理解这个项目和做一个RPC需要注意的地方

打算分为多个版本,从最基本的功能,到逐渐加入新功能和新特性,不断的完善。其中也有一些作者本身的思考优化,其中不足和有误之处还望大家指正

代码地址(目前已经有两个版本): https://github.com/karatttt/MyRPC

Server端

rpc首先有多个service,每一个service对应多个方法,当请求到来时再正确路由到对应的方法,通过server端处理后返回client端。所以server端主要做的就是一:注册service和对应的method,二:解析配置文件启动Server, 三:能够正确路由到来的请求并返回client。

service和Method的注册

grpc和trpc都是使用protobuf作为序列化格式,这里我们的项目也用protobuf格式进行序列化,成熟的rpc项目正常会有对应的工具,我们写好proto文件和对应的service的实现类后,使用自动化构建工具可以生成桩代码,包括以下部分:

  1. 消息类(Message Struct): 把你 .proto 里面定义的请求、响应对象变成对应的语言结构体,比如 UserRequest、UserReply
  2. 服务接口(Service Interface): 把你 .proto 里面定义的方法变成一组接口或基类,供你实现,比如 GetUser(ctx, req)
  3. 客户端 Stub :客户端可以直接用来调用远程方法的代码(自动封装了序列化、网络传输、重试等逻辑),类似于java的动态代理
  4. 服务端 Stub :服务端接收到请求后,自动反序列化,然后回调你实现的业务逻辑,也类似于java的动态代理

这里我们尝试通过一个proto文件,自己实现一个server端的桩代码

syntax = "proto3";package myrpc.helloworld;
option go_package="/pb";service Greeter {rpc Hello (HelloRequest) returns (HelloReply) {}
}message HelloRequest {string msg = 1;
}message HelloReply {string msg = 1;
}

第一步,根据填的方法写一个接口:

// 具体方法接口
type HelloServer interface {// SayHello is the method that will be called by the client.Hello(req *HelloRequest) (*HelloReply, error)
}

第二步,我们对每一个方法写一个handler,写实际上的处理逻辑,即如何反序列化,然后回调实际写的业务逻辑,再返回结构体。具体的如何序列化反序列化实现我们后面再看

func HelloServer_Hello_Handler(srv interface{}, req []byte) (interface{}, error) {// 这里的srv是HelloServer的实现类,我们自己写的// 通过类型断言将srv转换为HelloServer类型helloServer, ok := srv.(HelloServer)if !ok {return nil, fmt.Errorf("HelloServer_Hello_Handler: %v", "type assertion failed")}// 调用HelloServer的Hello方法// 将req反序列化reqBody := &HelloRequest{}err := codec.Unmarshal(req, reqBody)if err != nil {return nil, fmt.Errorf("HelloServer_Hello_Handler: %v", err)}// 调用实际我们写的业务逻辑reply, err := helloServer.Hello(reqBody)if err != nil {fmt.Printf("HelloServer_Hello_Handler: %v", err)return nil, err}return reply, nil
}

第三步,我们写了Handler,当然要让server端能够路由到这个Handler,所以这个Handler需要绑定一个方法名和服务名,作为key保存再server端的一个map里,这样就可以正确路由。所以我们可以写一个方法,将这个映射关系注册到server里。

这个server的Register方法,我们后面再来实现。

// 映射关系
var HelloServer_ServiceDesc = server.ServiceDesc{ServiceName: "helloworld",HandlerType: (*HelloServer)(nil),Methods: []server.MethodDesc{{MethodName: "Hello",// 当接受到客户端调用的Hello方法时,server将会调用这个方法Func:    HelloServer_Hello_Handler,},},}// 绑定方法
func RegisterHelloServer(s *server.Server, svr interface{}) error {if err := s.Register(&HelloServer_ServiceDesc, svr); err != nil {panic(fmt.Sprintf("Greeter register error:%v", err))}return nil
}

Server端的启动

  • Server启动的时候,需要根据我们写的配置文件以得知每一个service的name,以及他们对应的ip和端口号(当然后续还有其他的配置),正常多个service的ip和端口号是一样的,也就是说serve启动的时候,统一暴露一个端口用于rpc调用。
  • 所以server启动的流程是:一:读取配置,二:根据配置名创建多个service并保存
func NewServer() *Server {// 1. 创建一个Server实例server := &Server{services: make(map[string]Service),}// 2. 读取配置文件config, err := loadConfig("./rpc.yaml")if err != nil {fmt.Print("读取配置文件出错")}// 3. 创建服务for _, svc := range config.Server.Service {// 创建服务,这里创建了service实例service := NewService(svc.Name, WithAddress(fmt.Sprintf("%s:%d", svc.IP, svc.Port)))// 添加到服务映射server.services[svc.Name] = service}return server
}

Service类的实现

前面server端启动的时候创建了所有的service类,这里我们看看具体service应该做什么。

当请求进来时,首先找到service,再找到对应的Method,所以service应该持有method的map,以及在这里实现前面提到的Register逻辑。

同时,每一个service应该有一个serve方法,即提供服务,就是这里开始监听请求,路由和处理,这个后续会详细展开。

我们再为service实现Handler接口,赋予处理业务逻辑的能力,这个接口就是为了路由找到service里的method并调用它,这个Handler详细我们后面再看

// 定义接口,提供一些服务的注册和开启服务的功能
type Service interface {// Register registers a service with the server.// The serviceName is the name of the service, and service is the implementation of the service.Register(serviceDesc *ServiceDesc, service interface{}) error// Serve starts the server and listens for incoming connections.Serve(address string) error
}
// 定义一个Handler接口,service实现了这个接口
type Handler interface {Handle(ctx context.Context, frame []byte) (rsp []byte, err error)
}

我们先看看比较简单的regsiter方法,虽然registerMethods看起来复杂,但是实际上就是将前面桩代码的Handler作为一个函数存在map里

// 实现service的Register方法,填充service的各个属性
func (s *service) Register(serviceDesc *ServiceDesc, service interface{}) error {// 初始化Transports.opts.Transport = transport.DefaultServerTransports.registerMethods(serviceDesc.Methods, service)return nil
}// 注册普通方法
func (s *service) registerMethods(methods []MethodDesc, serviceImpl interface{}) error {for _, method := range methods {if _, exists := s.handler[method.MethodName]; exists {return fmt.Errorf("duplicate method name: %s", method.MethodName)}s.handler[method.MethodName] = func(req []byte) (rsp interface{}, err error) {if fn, ok := method.Func.(func(svr interface{}, req []byte) (rsp interface{}, err error)); ok {// 这里调用的就是rpc.go里面的实际的handler方法return fn(serviceImpl, req)}return nil, fmt.Errorf("method.Func is not a valid function")}}return nil
}

Service类的Server方法处理请求

func (s *service) Serve(address string) error {fmt.Printf("Server is listening on %s\n", address)// 将service作为Handler传入transport,后续接收到请求,会调用service的Handle方法s.opts.Transport.RegisterHandler(s)err := s.opts.Transport.ListenAndServe(context.Background(), "tcp", address)if err != nil {return fmt.Errorf("failed to listen: %v", err)}return nil
}
  • 这个Serve方法会在Server端启动的时候,依次触发每一个service类的这个Serve方法,意即为每一个service提供处理请求的能力
  • 这里做了一个serverTransport主要负责网络请求,我们重点关注ListenAndServe
// ListenAndServe 监听并处理 TCP 连接
func (t *serverTransport) ListenAndServe(ctx context.Context, network, address string) error {ln, err := net.Listen(network, address)if err != nil {return fmt.Errorf("failed to listen: %w", err)}defer ln.Close()go func() {<-ctx.Done()ln.Close()}()return t.serveTCP(ctx, ln)
}// serveTCP 处理 TCP 连接
func (t *serverTransport) serveTCP(ctx context.Context, ln net.Listener) error {fmt.Print("开始监听TCP连接")for {conn, err := ln.Accept()if err != nil {select {case <-ctx.Done():return nil // 退出监听default:fmt.Println("accept error:", err)}continue}go t.handleConnection(ctx, conn)}
}
// handleConnection 处理单个连接
func (t *serverTransport) handleConnection(ctx context.Context, conn net.Conn) {//TODO 这里可以做一个处理业务逻辑的协程池// 实际上每个连接一个协程,同时负责读取请求并直接处理业务逻辑也是可行的,读取请求时如果阻塞,Go调度器会自动切换到其他协程执行// 但是协程池可以限制同时处理业务逻辑的协程数量,避免请求量大时,过多协程导致的资源消耗// 这里是处理完一个请求就释放连接,后续可以考虑长连接defer conn.Close()fmt.Println("New connection from", conn.RemoteAddr())// 读取帧frame, err := codec.ReadFrame(conn)if err != nil {fmt.Println("read frame error:", err)return}// 调用service的Handler执行结果response, err := t.ConnHandler.Handle(ctx, frame)if err != nil {fmt.Println("handle error:", err)return}// 发送响应,此时已经是完整帧conn.Write(response)
}
  • 以上的代码简单来说就是,开启一个coonection,for循环accept请求,一旦请求到达,开启协程进行实际的业务逻辑处理
  • 这个 t.ConnHandler.Handle(ctx, frame),实际上就是service里的Handler方法,当transport收到请求时,回到我们的service的Handler方法执行。
  • 对于codec.ReadFrame(conn)我们下面重点看看

Service类的Handler方法

接收到请求,我们的处理过程应该是这样:

  1. 接收codec.ReadFrame后得到原始字节流(frame)
  2. 解码frame
  3. 调用对应的业务方法 handler(其间反序列化)
  4. 把业务返回结果序列化
  5. 编码生成frame返回给调用方

首先设计Frame结构如下:
在这里插入图片描述

ReadFrame

即根据帧头读取一段完整的自定义协议数据,解决半包和粘包问题,先读16字节的帧头解析各字段,校验魔数和版本号,再根据帧头中记录的协议数据长度和消息体长度继续读取剩下的内容,最后把帧头和帧体拼成一个完整的字节数组返回。


func ReadFrame(conn net.Conn) ([]byte, error) {buf := bufio.NewReader(conn)// 读取帧头headerBuf := make([]byte, HeaderLength)n, err := io.ReadFull(buf, headerBuf)if err != nil {return nil, fmt.Errorf("read header error: %v, read %d bytes", err, n)}// 正确解析所有字段header := FrameHeader{MagicNumber:    binary.BigEndian.Uint16(headerBuf[0:2]),Version:        headerBuf[2],MessageType:    headerBuf[3],SequenceID:     binary.BigEndian.Uint32(headerBuf[4:8]),ProtocolLength: binary.BigEndian.Uint32(headerBuf[8:12]),BodyLength:     binary.BigEndian.Uint32(headerBuf[12:16]),}if header.MagicNumber != MagicNumber {return nil, fmt.Errorf("invalid magic number: %d", header.MagicNumber)}if header.Version != Version {return nil, fmt.Errorf("unsupported version: %d", header.Version)}// 读取协议数据 + 消息体frameBody := make([]byte, header.ProtocolLength+header.BodyLength)_, err = io.ReadFull(buf, frameBody)if err != nil {return nil, fmt.Errorf("read body error: %v", err)}// 拼接完整帧frame := append(headerBuf, frameBody...)return frame, nil
}
Decode

读取到Frame后,需要解析出其中的消息体,并将读取到的协议数据存起来

func (c *servercodec) Decode(msg internel.Message, frame []byte) ([]byte, error) {// 解析帧头header := FrameHeader{MagicNumber:    binary.BigEndian.Uint16(frame[0:]),Version:        frame[2],MessageType:    frame[3],SequenceID:     binary.BigEndian.Uint32(frame[4:]),ProtocolLength: binary.BigEndian.Uint32(frame[8:]),BodyLength:     binary.BigEndian.Uint32(frame[12:]),}// 验证魔数和版本if header.MagicNumber != MagicNumber {return nil, fmt.Errorf("invalid magic number: %d", header.MagicNumber)}if header.Version != Version {return nil, fmt.Errorf("unsupported version: %d", header.Version)}// 提取协议数据protocolData := frame[HeaderLength : HeaderLength+header.ProtocolLength]// 解析协议数据proto, err := DeserializeProtocolData(protocolData)if err != nil {return nil, fmt.Errorf("parse protocol data error: %v", err)}// 设置到消息中msg.WithServiceName(proto.ServiceName)msg.WithMethodName(proto.MethodName)// 返回消息体return frame[HeaderLength+header.ProtocolLength:], nil
}
Unmarshal

得到消息体后,还是字节数组,这个时候根据protobuf的格式,反序列化成对应的结构体(这个方法的调用在前面的桩代码的HelloServer_Hello_Handler里)

// Unmarshal 将 protobuf 字节数组反序列化为结构体
func Unmarshal(rspDataBuf []byte, rspBody interface{}) error {msg, ok := rspBody.(proto.Message)if !ok {return fmt.Errorf("Unmarshal: rspBody does not implement proto.Message")}return proto.Unmarshal(rspDataBuf, msg)
}

反序列化后,即可处理业务逻辑,返回的响应的结构仍需要序列化,编码(补充协议数据和帧头),返回客户端,这里就不再详细说明

这里先写server,client下一篇文章再讲

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/903056.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【学习笔记】Stata

一、Stata简介 Stata 是一种用于数据分析、数据管理和图形生成的统计软件包&#xff0c;广泛应用于经济学、社会学、政治科学等社会科学领域。 二、Stata基础语法 2.1 数据管理 Stata 支持多种数据格式的导入&#xff0c;包括 Excel、CSV、文本文件等。 从 Excel 文件导入…

Redis数据结构SDS,IntSet,Dict

目录 1.字符串&#xff1a;SDS 1.1.为什么叫做动态字符串 2.IntSet 2.1.inset如何保存大于当前编码的最大数字&#xff1f; 3.Dict 3.1Dict的扩容 3.2Dict的收缩 3.3.rehash 1.字符串&#xff1a;SDS SDS的底层是C语言编写的构建的一种简单动态字符串 简称SDS&#xff…

Maven的聚合工程与继承

目录 一、为什么需要使用Maven工程 二、聚合工程的结构 三、聚合工程实现步骤 四、父工程统一管理版本 五、编译打包 大家好&#xff0c;我是jstart千语。想着平时开发项目似乎都是用maven来管理的&#xff0c;并且大多都是聚合工程。而且在maven的聚合工程中&#xff0c…

前端职业发展:如何规划前端工程师的成长路径?

前端职业发展:如何规划前端工程师的成长路径? 大家好,我是全栈老李。今天咱们聊聊前端工程师的职业发展路径,这个话题看似简单,实则暗藏玄机。就像打游戏升级一样,你得知道下一关是什么,才能提前准备装备和技能点。 前端之路 一般我们从一个新手到大神,普遍需要经过…

【星海出品】分布式存储数据库etcd

etcd 数据库由 CoreOS 公司创建。 https://github.com/etcd-io/etcd api信息 https://etcd.io/docs/v3.5/dev-guide/api_reference_v3/ etcdctl --help etcd 最初由 CoreOS 公司开发&#xff0c;作为其核心项目之一。 CoreOS 成立于 2013 年&#xff0c;专注于容器化技术&#…

2025新版修复蛇年运势测试风水起名系统源码

2025新版修复蛇年运势测试风水起名系统源码 通过网盘分享的文件&#xff1a;2025xbfsysweb.rar 链接: https://pan.baidu.com/s/1r1MOkJJJMj9s9nQX_GzI3Q 提取码: 9weh 备用下载地址&#xff1a;http://pan.1234f.com:5212/s/JK1uw

Vue3 Pinia

一、Pinia 核心概念 Pinia 是 Vue3 官方推荐的状态管理库&#xff0c;相比 Vuex 4&#xff0c;具有以下优势&#xff1a; 更简洁的 API&#xff08;移除 mutations&#xff09; 完整的 TypeScript 支持 支持组合式 API 自动代码分割 轻量级&#xff08;仅 1KB&#xff09;…

音视频小白系统入门课-4

本系列笔记为博主学习李超老师课程的课堂笔记&#xff0c;仅供参阅 往期课程笔记传送门&#xff1a; 音视频小白系统入门笔记-0音视频小白系统入门笔记-1音视频小白系统入门笔记-2音视频小白系统入门笔记-3 将mp4文件转换为yuv文件 ffmpeg -i demo.mp4 # 输入文件-an …

6.2 内容生成与营销:个性化内容创作与营销策略优化

随着消费者对个性化体验的需求日益增长&#xff0c;传统的内容创作与营销方式已难以满足市场竞争的需要。基于大语言模型&#xff08;LLM&#xff09;与智能代理&#xff08;Agent&#xff09;的技术为企业提供了全新的解决方案&#xff0c;能够实现高效、精准、规模化的内容生…

kafka课后总结

Kafka是由LinkedIn开发的分布式发布 - 订阅消息系统&#xff0c;具备高吞吐量、低延迟、可扩展性、持久性、可靠性、容错性和高并发等特性。其主要角色包括Broker、Topic、Partition、Producer、Consumer、Consumer Group、replica、leader、follower和controller。消息系统中存…

DataStreamAPI实践原理——计算模型

引入 通过前面我们对于Flink的理解&#xff0c;我们知道它吸收了 Dataflow 的理念&#xff0c;以及此前已有的流处理系统&#xff08;如 S4、Storm、MillWheel&#xff09;的经验&#xff0c;实现了批流一体化的高效数据处理&#xff0c;并且通过灵活的窗口机制、事件时间与水…

项目笔记1:通用 Service的常见方法

通用 Service 通常封装了常见的业务逻辑操作&#xff0c;以提高代码的复用性和可维护性。不同的框架和业务场景下&#xff0c;通用 Service 的方法会有所差异&#xff0c;但一般都会包含一些基本的增删改查&#xff08;CRUD&#xff09;操作&#xff0c;以下为你详细介绍&#…

阿里云99机器总是宕机,实测还是磁盘性能差

阿里云99计划总是宕机&#xff0c;经过反复排查&#xff0c;最终确认还是磁盘性能差。 阿里云99机器使用的磁盘类型是Entry云盘40GiB (2120 IOPS) 按照官方的一些数据&#xff0c;这个磁盘最小iops是1800最大是6000,实际使用中发现&#xff0c;这个6000值很虚&#xff0c;这个…

Fedora 43 计划移除所有 GNOME X11 相关软件包

Fedora 43 计划移除所有 GNOME X11 相关软件包&#xff0c;这是 Fedora 项目团队为全面拥抱 Wayland 所做的重要决策。以下是关于此计划的详细介绍&#xff1a; 提案内容&#xff1a;4 月 23 日&#xff0c;Neal Gompa 提交提案&#xff0c;建议从 Fedora 软件仓库中移除所有 G…

魔幻预言手游》:职业介绍!

在《魔幻预言》手游中&#xff0c;共有武玄、魔魅、剑仙三大核心职业&#xff0c;各具特色且定位鲜明&#xff0c;以下为具体介绍&#xff1a; 一、武玄&#xff08;战士&#xff09; 核心定位&#xff1a;近战物理输出与团队增益担当&#xff0c;兼具控制与防御能力。 战斗风…

精益数据分析(27/126):剖析用户价值与商业模式拼图

精益数据分析&#xff08;27/126&#xff09;&#xff1a;剖析用户价值与商业模式拼图 在创业和数据分析的领域中&#xff0c;每一次深入学习都是一次成长的契机。今天&#xff0c;我们继续秉持共同进步的理念&#xff0c;深入研读《精益数据分析》&#xff0c;剖析用户价值的…

【SwitchyOmega安装教程】

目录 一、插件安装 1. 下载安装文件 2. 打开浏览器扩展安装页面 3. 安装插件 二、界面详情 三、配置信息 3.1 设置IP 1、查看IP地址信息 2、批量测试IP是否有效 3、点击扩展程序&#xff0c;选择 Proxy SwitchyOmega 4、 点击选项进行配置 5、配置页面 一、插件安装 1…

矫平机终极指南:特殊材料处理、工艺链协同与全球供应链管理

一、特殊材料矫平&#xff1a;挑战与创新解决方案 1. 高温合金&#xff08;如Inconel 718&#xff09;处理 技术难点&#xff1a; 屈服强度高达1100 MPa&#xff0c;传统矫平力不足 高温下易氧化&#xff0c;需惰性气体保护环境 解决方案&#xff1a; 采用双伺服电机驱动&a…

反事实——AI与思维模型【82】

一、定义 反事实思维模型是一种心理认知模型,它指的是人们在头脑中对已经发生的事件进行否定,然后构建出一种可能性假设的思维活动。简单来说,就是思考“如果当时……,那么就会……”的情景。这种思维方式让我们能够超越现实的限制,设想不同的可能性和结果,从而对过去的…

Nginx:支持 HTTPS

文章目录 Nginx 开启 ssl 以支持 HTTPS1 生成本地证书2 开启 ssl 以支持 HTTPS3 将 https 的请求转发给 http 最终的 nginx.conf 如下 Nginx 开启 ssl 以支持 HTTPS [!IMPORTANT] 在下文中&#xff0c;将采用如下定义。 HTTP端口&#xff1a; 80 HTTPS端口&#xff1a; 443 服务…