go-zero开发入门之gateway深入研究1

创建一个 gateway 示例:

// main.go
package mainimport ("flag""fmt""gateway/middleware""github.com/zeromicro/go-zero/core/conf""github.com/zeromicro/go-zero/gateway"
)var configFile = flag.String("f", "etc/gateway.yaml", "the config file")func main() {var c gateway.GatewayConfflag.Parse()// 加载 gateway 配置,如果配置有问题记录 FATAL 日志后即退出conf.MustLoad(*configFile, &c)// 实例化 gateway,如果出错记录 FATAL 日志后即退出// 可能的出错包括:// 1)初始化日志 logx 失败(创建日志文件失败),日志文件含以下五种://    信息级别的日志:infoLog//    错误级别的日志:errorLog//    严重级别的日志:severeLog//    慢查询日志:slowLog//    统计日志:statLog// 而堆栈日志 stackLog 同 errorLog 一起,访问日志 access 同 infoLog 。server := gateway.MustNewServer(c)defer server.Stop()fmt.Printf("Starting gateway at %s:%d...\n", c.Host, c.Port)server.Start()
}
// gateway/server.go
// MustNewServer creates a new gateway server.
func MustNewServer(c GatewayConf, opts ...Option) *Server {svr := &Server{upstreams: c.Upstreams,Server:    rest.MustNewServer(c.RestConf),}for _, opt := range opts {opt(svr)}return svr
}// rest/server.go
// MustNewServer returns a server with given config of c and options defined in opts.
// Be aware that later RunOption might overwrite previous one that write the same option.
// The process will exit if error occurs.
func MustNewServer(c RestConf, opts ...RunOption) *Server {server, err := NewServer(c, opts...)if err != nil {logx.Must(err)}return server
}

gateway.MustNewServer 调用了 rest.MustNewServer,但在 rest.MustNewServer 增加了 upstreams 的初始化。upstreams 源自于 gateway.GatewayConf,对应的配置如下:。

Upstreams: # 网关上游的配置列表- Grpc: # 网关上游只能为 grpc 服务,不支持 http 等服务其它服务Etcd: # 服务发现用的 Etcd 配置Hosts: # Etcd 的服务地址列表- 127.0.0.1:2379Key: login.rpc # 服务注册在 Etcd 的 keyProtoSets: # 服务的 pb 文件列表(使用工具 protoc 根据 proto 生成 pb 文件:protoc --descriptor_set_out=login.pb login.proto)- proto/login.pbMappings: # Mappings can also be written in proto options 定义 http 路径到 rpc 路径的映射列表- Method: getPath: /v1/loginRpcPath: login.Login/login // 格式:包名.服务名/方法名

从上述内容可以看出,go-zero 的 gateway 在 rest 基础上增加了 upstreams 。当然不仅这一些,在 gateway 启动时也增加了特有的东西:

// Start starts the gateway server.
func (s *Server) Start() {logx.Must(s.build()) // 这也是 gateway 在 rest 基础上新增的s.Server.Start()
}

上述 s.build() 的源代码如下:

// gateway/server.go
func (s *Server) build() error {// 调用 s.ensureUpstreamNames() 确保所有上游服务(gRPC 服务)的名称都是唯一的,// 如果有重复的名称,函数返回错误。if err := s.ensureUpstreamNames(); err != nil {return err}// 使用 mr.MapReduceVoid 函数进行 MapReduce 操作,这个函数接收三个参数:// 1)一个用于生成数据源的函数// 2)一个 Map 函数// 3)一个 Reduce 函数return mr.MapReduceVoid(func(source chan<- Upstream) {// 生成数据源的函数:// 遍历 s.upstreams(上游服务列表),将每个上游服务发送到 Map 函数for _, up := range s.upstreams {source <- up}}, func(up Upstream, writer mr.Writer[rest.Route], cancel func(error)) { // Map 函数,对于每个上游服务,执行以下操作:var cli zrpc.Client// 创建一个 gRPC 客户端 cli,用于与上游服务通信if s.dialer != nil {cli = s.dialer(up.Grpc)} else {cli = zrpc.MustNewClient(up.Grpc)}// 调用 s.createDescriptorSource(cli, up) 创建一个描述符源 grpcurl.DescriptorSource),// 用于获取 gRPC 服务的元数据。source, err := s.createDescriptorSource(cli, up)if err != nil {cancel(fmt.Errorf("%s: %w", up.Name, err))return}// 使用 internal.GetMethods(source) 获取 gRPC 服务的所有方法methods, err := internal.GetMethods(source)if err != nil {cancel(fmt.Errorf("%s: %w", up.Name, err))return}// 创建一个 gRPCurl 解析器,用于解析 gRPC 方法的元数据resolver := grpcurl.AnyResolverFromDescriptorSource(source)// 遍历这些方法,为每个具有 HTTP 方法和路径的方法生成一个 HTTP 处理器(s.buildHandler(...)),// 并将它们映射到 RESTful API 的路由上。for _, m := range methods {if len(m.HttpMethod) > 0 && len(m.HttpPath) > 0 {writer.Write(rest.Route{Method:  m.HttpMethod,Path:    m.HttpPath,// http 调用转为 rpc 调用Handler: s.buildHandler(source, resolver, cli, m.RpcPath),})}}methodSet := make(map[string]struct{})for _, m := range methods {methodSet[m.RpcPath] = struct{}{}}// 遍历 up.Mappings(自定义的 RESTful API 映射),// 为每个映射生成一个 HTTP 处理器,并将生成的路由写入到 Reduce 函数。// 如果映射中指定的 gRPC 方法不存在,则返回错误。for _, m := range up.Mappings {// 在将方法映射到路由之前,函数会检查映射是否存在,如果不存在则返回错误if _, ok := methodSet[m.RpcPath]; !ok {cancel(fmt.Errorf("%s: rpc method %s not found", up.Name, m.RpcPath))return}writer.Write(rest.Route{Method:  strings.ToUpper(m.Method),Path:    m.Path,// 调用 buildHandler 函数来构建一个处理器,用于处理 RESTful API 请求Handler: s.buildHandler(source, resolver, cli, m.RpcPath),})}}, func(pipe <-chan rest.Route, cancel func(error)) {// Reduce 函数:// 从管道中读取生成的路由,并将它们添加到 HTTP 服务器(s.Server)中for route := range pipe {s.Server.AddRoute(route)}})
}

这个函数的主要目的是将 gRPC 服务的方法映射到 HTTP RESTful API,并将生成的 API 添加到 HTTP 服务器中。通过这种方式,可以在 gRPC 服务的基础上提供一个 RESTful API,使得客户端可以使用 HTTP 调用 gRPC 服务。

下为 mr.MapReduceVoid 的源代码:

// core/mr/mapreduce.go
// MapReduceVoid maps all elements generated from given generate,
// and reduce the output elements with given reducer.
func MapReduceVoid[T, U any](generate GenerateFunc[T], mapper MapperFunc[T, U], reducer VoidReducerFunc[U], opts ...Option) error {_, err := MapReduce(generate, mapper, func(input <-chan U, writer Writer[any], cancel func(error)) {reducer(input, cancel)}, opts...)if errors.Is(err, ErrReduceNoOutput) {return nil}return err
}// MapReduce maps all elements generated from given generate func,
// and reduces the output elements with given reducer.
func MapReduce[T, U, V any](generate GenerateFunc[T], mapper MapperFunc[T, U], reducer ReducerFunc[U, V], opts ...Option) (V, error) {panicChan := &onceChan{channel: make(chan any)}source := buildSource(generate, panicChan)return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
}
// gateway/server.go
func (s *Server) buildHandler(source grpcurl.DescriptorSource, resolver jsonpb.AnyResolver,cli zrpc.Client, rpcPath string) func(http.ResponseWriter, *http.Request) {return func(w http.ResponseWriter, r *http.Request) {parser, err := internal.NewRequestParser(r, resolver)if err != nil {httpx.ErrorCtx(r.Context(), w, err)return}w.Header().Set(httpx.ContentType, httpx.JsonContentType)handler := internal.NewEventHandler(w, resolver)// http 调用转成了 grpc 调用if err := grpcurl.InvokeRPC(r.Context(), source, cli.Conn(), rpcPath, s.prepareMetadata(r.Header),handler, parser.Next); err != nil {httpx.ErrorCtx(r.Context(), w, err)}st := handler.Statusif st.Code() != codes.OK {httpx.ErrorCtx(r.Context(), w, st.Err())}}
}

http 调用转 grpc 调用过程复杂,最终调用了 grpc-go 的 Invoke:

// https://github.com/grpc/grpc-go/blob/master/clientconn.go
// ClientConnInterface defines the functions clients need to perform unary and
// streaming RPCs.  It is implemented by *ClientConn, and is only intended to
// be referenced by generated code.
type ClientConnInterface interface { // ClientConn 实现了该接口,实现落在两个文件中:clientconn.go 和 call.go// Invoke performs a unary RPC and returns after the response is received// into reply.Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error// NewStream begins a streaming RPC.NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
}

中间还用到了 grpcdynamic 的 Stub.InvokeRpc:

// https://github.com/jhump/protoreflect/blob/main/dynamic/grpcdynamic/stub.go
// InvokeRpc sends a unary RPC and returns the response. Use this for unary methods.
func (s Stub) InvokeRpc(ctx context.Context, method *desc.MethodDescriptor, request proto.Message, opts ...grpc.CallOption) (proto.Message, error) {if method.IsClientStreaming() || method.IsServerStreaming() {return nil, fmt.Errorf("InvokeRpc is for unary methods; %q is %s", method.GetFullyQualifiedName(), methodType(method))}if err := checkMessageType(method.GetInputType(), request); err != nil {return nil, err}resp := s.mf.NewMessage(method.GetOutputType())if err := s.channel.Invoke(ctx, requestMethod(method), request, resp, opts...); err != nil {return nil, err}return resp, nil
}
// https://github.com/grpc/grpc-go/blob/master/call.go
package grpcimport ("context"
)// Invoke sends the RPC request on the wire and returns after response is
// received.  This is typically called by generated code.
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error {// allow interceptor to see all applicable call options, which means those// configured as defaults from dial option as well as per-call optionsopts = combine(cc.dopts.callOptions, opts)if cc.dopts.unaryInt != nil {return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)}return invoke(ctx, method, args, reply, cc, opts...)
}func invoke(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error {// cs 类型为,// 结构体 clientStream 实现了接口 ClientStreamcs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)if err != nil {return err}if err := cs.SendMsg(req); err != nil { // 发送请求return err}return cs.RecvMsg(reply) // 接收响应
}// ClientStream defines the client-side behavior of a streaming RPC.
//
// All errors returned from ClientStream methods are compatible with the
// status package.
type ClientStream interface {// Header returns the header metadata received from the server if there// is any. It blocks if the metadata is not ready to read.  If the metadata// is nil and the error is also nil, then the stream was terminated without// headers, and the status can be discovered by calling RecvMsg.Header() (metadata.MD, error)// Trailer returns the trailer metadata from the server, if there is any.// It must only be called after stream.CloseAndRecv has returned, or// stream.Recv has returned a non-nil error (including io.EOF).Trailer() metadata.MD// CloseSend closes the send direction of the stream. It closes the stream// when non-nil error is met. It is also not safe to call CloseSend// concurrently with SendMsg.CloseSend() error// Context returns the context for this stream.//// It should not be called until after Header or RecvMsg has returned. Once// called, subsequent client-side retries are disabled.Context() context.Context// SendMsg is generally called by generated code. On error, SendMsg aborts// the stream. If the error was generated by the client, the status is// returned directly; otherwise, io.EOF is returned and the status of// the stream may be discovered using RecvMsg.//// SendMsg blocks until://   - There is sufficient flow control to schedule m with the transport, or//   - The stream is done, or//   - The stream breaks.//// SendMsg does not wait until the message is received by the server. An// untimely stream closure may result in lost messages. To ensure delivery,// users should ensure the RPC completed successfully using RecvMsg.//// It is safe to have a goroutine calling SendMsg and another goroutine// calling RecvMsg on the same stream at the same time, but it is not safe// to call SendMsg on the same stream in different goroutines. It is also// not safe to call CloseSend concurrently with SendMsg.//// It is not safe to modify the message after calling SendMsg. Tracing// libraries and stats handlers may use the message lazily.SendMsg(m any) error// RecvMsg blocks until it receives a message into m or the stream is// done. It returns io.EOF when the stream completes successfully. On// any other error, the stream is aborted and the error contains the RPC// status.//// It is safe to have a goroutine calling SendMsg and another goroutine// calling RecvMsg on the same stream at the same time, but it is not// safe to call RecvMsg on the same stream in different goroutines.RecvMsg(m any) error
}

调用路径归纳总结:

   grpcurl/grpcurl.InvokeRPC()/invoke.go
-> grpcdynamic/Stub.InvokeRpc()/stub.go
-> grpc-go/grpc.ClientConn.Invoke()/clientconn.go|call.go // ClientConn 是一个 struct,实现了接口 ClientConnInterface
-> grpc-go/grpc.invoke()/call.go // invoke 是 grpc 下的全局私有函数
-> grpc-go/grpc.clientStream::SendMsg()/stream.go // clientStream 是一个 struct,实现了接口 ClientStream
-> grpc-go/grpc.csAttempt::SendMsg()/stream.go // csAttempt 是一个 struct,实现了接口 ClientTransport
-> grpc-go/grpc.ClientTransport::write()/internal/transport/transport.go // ClientTransport 是一个接口,结构体 http2Client 实现了 ClientTransport
-> grpc-go/grpc.http2Client::Write()/internal/transport/http2_client.go // 结构体 http2Client 实现了 ClientTransport,将数据写入 http2Client.controlBuf 中

http2Client::Write 将数据写入 http2Client.controlBuf 后返回,数据的发送由另外的协程 loopyWriter.run() 负责:

// https://github.com/grpc/grpc-go/blob/master/internal/transport/controlbuf.go
//
// run should be run in a separate goroutine.
// It reads control frames from controlBuf and processes them by:
// 1. Updating loopy's internal state, or/and
// 2. Writing out HTTP2 frames on the wire.
//
// Loopy keeps all active streams with data to send in a linked-list.
// All streams in the activeStreams linked-list must have both:
// 1. Data to send, and
// 2. Stream level flow control quota available.
//
// In each iteration of run loop, other than processing the incoming control
// frame, loopy calls processData, which processes one node from the
// activeStreams linked-list.  This results in writing of HTTP2 frames into an
// underlying write buffer.  When there's no more control frames to read from
// controlBuf, loopy flushes the write buffer.  As an optimization, to increase
// the batch size for each flush, loopy yields the processor, once if the batch
// size is too low to give stream goroutines a chance to fill it up.
//
// Upon exiting, if the error causing the exit is not an I/O error, run()
// flushes and closes the underlying connection.  Otherwise, the connection is
// left open to allow the I/O error to be encountered by the reader instead.
func (l *loopyWriter) run() (err error) {defer func() {if l.logger.V(logLevel) {l.logger.Infof("loopyWriter exiting with error: %v", err)}if !isIOError(err) {l.framer.writer.Flush()l.conn.Close()}l.cbuf.finish()}()for {it, err := l.cbuf.get(true)if err != nil {return err}if err = l.handle(it); err != nil {return err}if _, err = l.processData(); err != nil {return err}gosched := truehasdata:for {it, err := l.cbuf.get(false)if err != nil {return err}if it != nil {if err = l.handle(it); err != nil {return err}if _, err = l.processData(); err != nil {return err}continue hasdata}isEmpty, err := l.processData() // 最底层调用了 Go 标准库的 io.Writer::Write(),Writer 是一个接口if err != nil {return err}if !isEmpty {continue hasdata}if gosched {gosched = falseif l.framer.writer.offset < minBatchSize {runtime.Gosched()continue hasdata}}l.framer.writer.Flush()break hasdata}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/225095.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

idea2023解决右键没有Servlet的问题

复制Servlet Class.java中的文件。 回到文件&#xff0c;然后点击小加号 然后输入刚刚复制的东西&#xff1a; 3. 此时右键有servlet。 4. 然后他让你输入下面两个框&#xff1a; JAVAEE TYPE中输入Servlet Class Name 表示你要创建的Servlet类的名称是什么。自己起名字。然后…

手动添加Git Bash Here到右键菜单(超详细)

通过WindowsR快捷键可以打开“运行窗口”&#xff0c;在“窗口”中输入“regedit”&#xff0c;点击“确定”打开注册表。 依次进入HKEY_CLASSES_ROOT —-》 Directory —-》Background —-》 shell 路径为Computer\HKEY_CLASSES_ROOT\Directory\Background\shell 3.在“s…

Python中Requests的深入了解

Requests的深入了解 基本POST请求&#xff08;data参数&#xff09; 1. 最基本post方法 response requests.post("http://www.baidu.com/", data data)2. 传入data数据 对于 POST 请求来说&#xff0c;我们一般需要为它增加一些参数。那么最基本的传参方法可以…

状态的一致性和FlinkSQL

状态一致性 一致性其实就是结果的正确性。精确一次是指数据有可能被处理多次&#xff0c;但是结果只有一个。 三个级别&#xff1a; 最多一次&#xff1a;1次或0次&#xff0c;有可能丢数据至少一次&#xff1a;1次或n次&#xff0c;出错可能会重试 输入端只要可以做到数据重…

mongo分组查询问题以及mongo根据Date类型查询

一、mongo分组查询 mongo中如果只是根据条件查询数据&#xff0c;则只需要&#xff1a; db.getCollection(表名).find({source:{$eq:5}}) 如果根据字段进行分组查询&#xff0c;那么需要用aggregate传一个数组进行查询&#xff0c;如 db.getCollection(表名).find({ "…

[每周一更]-(第27期):HTTP压测工具之wrk

[补充完善往期内容] wrk是一款简单的HTTP压测工具,托管在Github上,https://github.com/wg/wrkwrk 的一个很好的特性就是能用很少的线程压出很大的并发量. 原因是它使用了一些操作系统特定的高性能 io 机制, 比如 select, epoll, kqueue 等. 其实它是复用了 redis 的 ae 异步事…

Android APP 常见概念与 adb 命令

adb 的概念 adb 即 Android Debug Bridge 。在窗口输入 adb 即可显示帮助文档。adb 实际上就是在后台开启一个 server&#xff0c;会接收 adb 的命令然后帮助管理&#xff0c;控制&#xff0c;查看设备的状态、信息等&#xff0c;是开发、测试 Android 相关程序的最常用手段。…

Centos系统pnpm升级报错 ERR_PNPM_NO_GLOBAL_BIN_DIR

在 CentOS 系统中使用 pnpm i -g pnpm 报错&#xff1a;ERR_PNPM_NO_GLOBAL_BIN_DIR Unable to find the global bin directory&#xff0c;折腾半天终于解决了。 完整报错信息 [rootVM-8 test]# pnpm i -g pnpm Nothing to stop. No server is running for the store at /roo…

linux20day 排序sort 字符处理cut cpu使用占比排序 awk文本数据处理

目录 1、排序sort参数用法排序&#xff08;-n&#xff09;从大到小 倒叙&#xff08;-r&#xff09; cpu使用占比排序&#xff08;ps aux --sort -%cpu&#xff09; 2、截取到某个字符串 cut3、awk处理文本文件用法&#xff1a;打印等于 和不等于 1、排序sort 经常用于排序 参…

Spring 的 @Configuration 和 @Component 注解区别

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall &#x1f343; vue3-element-admin &#x1f343; youlai-boot &#x1f33a; 仓库主页&#xff1a; Gitee &#x1f4ab; Github &#x1f4ab; GitCode &#x1f496; 欢迎点赞…

数据分析的基本步骤

了解过数据分析的概念之后&#xff0c;我们再来说下数据分析的常规步骤。 明确目标 首先我们要确定一个目标&#xff0c;即我们要从数据中得到什么。比如我们要看某个指标A随时间的变化趋势&#xff0c;以期进行简单的预测。 数据收集 当确定了目标之后&#xff0c;就有了取…

js逆向-JS加密破解进阶

目录 一、JS逆向进阶一&#xff1a;破解AES加密 &#xff08;一&#xff09;AES对称加密算法原理 &#xff08;二&#xff09;破解AES加密 &#xff08;三&#xff09;实战&#xff1a;发现报告网 二、JS逆向进阶二&#xff1a;破解RSA加密 &#xff08;一&#xff09;RS…

gRPC 一种现代、开源、高性能的远程过程调用 (RPC) 可以在任何地方运行的框架

背景介绍 gRPC 是一种现代开源高性能远程过程调用 &#xff08;RPC&#xff09; 可以在任何环境中运行的框架。它可以有效地连接服务 在数据中心内和数据中心之间&#xff0c;具有对负载平衡、跟踪、 运行状况检查和身份验证。它也适用于最后一英里 分布式计算&#xff0c;用于…

P20类神经网络训练不起来怎么办?- 批次和动量

什么是batchsmall batch 和 large batch 的比较 &#xff1a; large batch 更快&#xff0c;small batch 在训练集和测试集上效果效果更好动量的意义和作用&#xff1a; 类似于物理上多了一点惯性&#xff0c;防止困在鞍点。 动量是之前所有梯度的加权和。 1. batch 是什么 …

高压电气是什么

高压电气 电工电气百科 文章目录 高压电气前言一、高压电气是什么二、高压电气的类别三、高压电气的作用原理总结前言 高压电气在电力系统中起着重要的作用,它能够将电能有效地输送和分配到各个用户,为社会和工业生产提供稳定可靠的电力供应。然而,高压电气系统也需要注意安…

Python【Matplotlib】鼠标单击事件判断点击的是否为图例

直接上代码&#xff1a; import matplotlib.pyplot as plt# 创建一个简单的图表 fig, ax plt.subplots() line, ax.plot([1, 2, 3], labelLine 1) ax.legend(draggableTrue)# 获取图例对象 legend ax.get_legend()# 获取图例的边界框 legend_bbox legend.get_window_exten…

Mr_HJ / form-generator项目文档学习与记录(续)

以后主打超融开源社区 (jiangzhicheng88) - Gitee.com render.js就是对vue的render函数的自己简单定制封装。 render.js实现的功能是将json表单中的__config__.tag解析为具体的vue组件&#xff1b; 正常开发流程我们组件输入的时候会触发组件内的 this.$emit(getValue, val)…

PyQt6 安装Qt Designer

前言&#xff1a;在Python自带的环境下&#xff0c;安装Qt Designer&#xff0c;并在PyCharm中配置designer工具。 在项目开发中&#xff0c;使用Python虚拟环境安装PyQt6-tools时&#xff0c;designer.exe会安装在虚拟环境的目录中&#xff1a;.venv\Lib\site-packages\qt6_a…

【PHP】openssl_encrypt、openssl_decrypt对称加密解密

目录 1.加密解密封装类 2.调用方法 3.打印结果 1.加密解密封装类 <?php namespace app\common\library;/*** AES加解密* Class Client* package common\components\wsd*/ class Aes {const KEY "xxxxxxxxxxxx";const CIPHER "AES-128-CBC";/*** 加…

新时代商业市场:AR技术的挑战与机遇并存

随着科技的不断发展&#xff0c;增强现实&#xff08;AR&#xff09;技术逐渐成为当今社会的一个重要组成部分。AR技术能够将虚拟世界与现实世界相结合&#xff0c;为人们提供更加丰富、多样化的体验。在新时代的社会商业市场中&#xff0c;AR技术也正逐渐被应用于各种商业活动…