milvus querynode启动源码分析

querynode启动源码分析

结构体

// QueryNode implements QueryNode grpc server
// cmd\components\query_node.go
type QueryNode struct {ctx context.Contextsvr *grpcquerynode.Server
}// Server is the grpc server of QueryNode.
type Server struct {querynode   types.QueryNodeComponentwg          sync.WaitGroupctx         context.Contextcancel      context.CancelFuncgrpcErrChan chan errorserverID atomic.Int64grpcServer *grpc.ServeretcdCli *clientv3.Client
}

querynode是一个接口,实现querynode api功能。

func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {wg.Add(1)// clear local storagerootPath := paramtable.Get().LocalStorageCfg.Path.GetValue()queryDataLocalPath := filepath.Join(rootPath, typeutil.QueryNodeRole)cleanLocalDir(queryDataLocalPath)// clear mmap dirmmapDir := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()if len(mmapDir) > 0 {cleanLocalDir(mmapDir)}return runComponent(ctx, localMsg, wg, components.NewQueryNode, metrics.RegisterQueryNode)
}// creator用NewQueryNode替换
role, err = creator(ctx, factory)

components.NewQueryNode是一个函数。

NewQueryNode()用来创建QueryNode结构体。

// NewQueryNode creates a new QueryNode
func NewQueryNode(ctx context.Context, factory dependency.Factory) (*QueryNode, error) {svr, err := grpcquerynode.NewServer(ctx, factory)if err != nil {return nil, err}return &QueryNode{ctx: ctx,svr: svr,}, nil
}

grpcquerynode.NewServer()产生的是本结构体Server。

// NewServer create a new QueryNode grpc server.
func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error) {ctx1, cancel := context.WithCancel(ctx)s := &Server{ctx:         ctx1,cancel:      cancel,querynode:   qn.NewQueryNode(ctx, factory),grpcErrChan: make(chan error),}return s, nil
}

qn.NewQueryNode()返回一个结构体,是 types.QueryNodeComponen接口的一个实现

执行Run()

Server结构体创建后,调用结构体的Run()方法。

func runComponent[T component](ctx context.Context,localMsg bool,runWg *sync.WaitGroup,creator func(context.Context, dependency.Factory) (T, error),metricRegister func(*prometheus.Registry),
) component {var role Tsign := make(chan struct{})go func() {factory := dependency.NewFactory(localMsg)var err errorrole, err = creator(ctx, factory)if localMsg {paramtable.SetRole(typeutil.StandaloneRole)} else {paramtable.SetRole(role.GetName())}if err != nil {panic(err)}close(sign)// 在这里调用对应组件结构体的Run()方法,这里是QueryNode结构体if err := role.Run(); err != nil {panic(err)}runWg.Done()}()......
}

runComponent是一个包裹函数。

// Run starts service
func (q *QueryNode) Run() error {if err := q.svr.Run(); err != nil {log.Error("QueryNode starts error", zap.Error(err))return err}log.Debug("QueryNode successfully started")return nil
}

Run()方法调用q.svr.Run()方法。srv是grpcquerynode.NewServer()返回的结构体。

进入Run()方法:

// Run initializes and starts QueryNode's grpc service.
func (s *Server) Run() error {if err := s.init(); err != nil {return err}log.Debug("QueryNode init done ...")if err := s.start(); err != nil {return err}log.Debug("QueryNode start done ...")return nil
}

接下来分析s.init()和s.start()方法。

s.init()

// init initializes QueryNode's grpc service.
func (s *Server) init() error {etcdConfig := &paramtable.Get().EtcdCfgParams := &paramtable.Get().QueryNodeGrpcServerCfgif !funcutil.CheckPortAvailable(Params.Port.GetAsInt()) {paramtable.Get().Save(Params.Port.Key, fmt.Sprintf("%d", funcutil.GetAvailablePort()))log.Warn("QueryNode get available port when init", zap.Int("Port", Params.Port.GetAsInt()))}log.Debug("QueryNode", zap.Int("port", Params.Port.GetAsInt()))etcdCli, err := etcd.GetEtcdClient(etcdConfig.UseEmbedEtcd.GetAsBool(),etcdConfig.EtcdUseSSL.GetAsBool(),etcdConfig.Endpoints.GetAsStrings(),etcdConfig.EtcdTLSCert.GetValue(),etcdConfig.EtcdTLSKey.GetValue(),etcdConfig.EtcdTLSCACert.GetValue(),etcdConfig.EtcdTLSMinVersion.GetValue())if err != nil {log.Debug("QueryNode connect to etcd failed", zap.Error(err))return err}s.etcdCli = etcdClis.SetEtcdClient(etcdCli)s.querynode.SetAddress(Params.GetAddress())log.Debug("QueryNode connect to etcd successfully")s.wg.Add(1)// 启动grpc,默认端口为21123go s.startGrpcLoop(Params.Port.GetAsInt())// wait for grpc server loop starterr = <-s.grpcErrChanif err != nil {return err}s.querynode.UpdateStateCode(commonpb.StateCode_Initializing)log.Debug("QueryNode", zap.Any("State", commonpb.StateCode_Initializing))// 调用querynode的初始化方法if err := s.querynode.Init(); err != nil {log.Error("QueryNode init error: ", zap.Error(err))return err}return nil
}

这段可以看出来,创建了etcdCli并赋予给了s.etcdCli。

s.startGrpcLoop()启动grpc端口服务。

最终调用s.querynode.Init()进行初始化,代码位置:internal\querynodev2\server.go

s.querynode是接口类型types.QueryNodeComponent,QueryNodeComponent继承于Component。

type QueryNodeComponent interface {QueryNodeUpdateStateCode(stateCode commonpb.StateCode)SetAddress(address string)GetAddress() stringSetEtcdClient(etcdClient *clientv3.Client)
}// QueryNode is the interface `querynode` package implements
type QueryNode interface {Componentquerypb.QueryNodeServer
}// Component is the interface all services implement
type Component interface {Init() errorStart() errorStop() errorRegister() error
}

接口套接口:

RootCoordComponent -> RootCoord -> Component
DataCoordComponent -> DataCoord -> Component
QueryCoordComponent -> QueryCoord -> Component
ProxyComponent -> Proxy -> Component
QueryNodeComponent -> QueryNode -> Component
IndexNodeComponent -> IndexNode -> Component
DataNodeComponent -> DataNode -> Component

各组件最终的Init()初始化代码路径:

internal\rootcoord\root_coord.go->Init()
internal\datacoord\server.go->Init()
internal\querycoordv2\server.go->Init()
internal\datanode\data_node.go->Init()
internal\indexnode\indexnode.go->Init()
internal\querynodev2\server.go->Init()
internal\proxy\proxy.go->Init()

回过头来继续querynode的init。

// Init function init historical and streaming module to manage segments
func (node *QueryNode) Init() error {var initError errornode.initOnce.Do(func() {// ctx := context.Background()log.Info("QueryNode session info", zap.String("metaPath", paramtable.Get().EtcdCfg.MetaRootPath.GetValue()))err := node.initSession()if err != nil {log.Error("QueryNode init session failed", zap.Error(err))initError = errreturn}err = node.initHook()if err != nil {// auto index cannot work if hook init failedif paramtable.Get().AutoIndexConfig.Enable.GetAsBool() {log.Error("QueryNode init hook failed", zap.Error(err))initError = errreturn}}node.factory.Init(paramtable.Get())localRootPath := paramtable.Get().LocalStorageCfg.Path.GetValue()localChunkManager := storage.NewLocalChunkManager(storage.RootPath(localRootPath))localUsedSize, err := segments.GetLocalUsedSize(localRootPath)if err != nil {log.Warn("get local used size failed", zap.Error(err))initError = errreturn}metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localUsedSize / 1024 / 1024))remoteChunkManager, err := node.factory.NewPersistentStorageChunkManager(node.ctx)if err != nil {log.Warn("failed to init remote chunk manager", zap.Error(err))initError = errreturn}node.cacheChunkManager, err = storage.NewVectorChunkManager(node.ctx,localChunkManager,remoteChunkManager,paramtable.Get().QueryNodeCfg.CacheMemoryLimit.GetAsInt64(),paramtable.Get().QueryNodeCfg.CacheEnabled.GetAsBool(),)if err != nil {log.Error("failed to init cache chunk manager", zap.Error(err))initError = errreturn}node.vectorStorage, err = node.factory.NewPersistentStorageChunkManager(node.ctx)if err != nil {log.Error("QueryNode init vector storage failed", zap.Error(err))initError = errreturn}log.Info("queryNode try to connect etcd success", zap.String("MetaRootPath", paramtable.Get().EtcdCfg.MetaRootPath.GetValue()))schedulePolicy := paramtable.Get().QueryNodeCfg.SchedulePolicyName.GetValue()node.scheduler = tasks.NewScheduler(schedulePolicy,)log.Info("queryNode init scheduler", zap.String("policy", schedulePolicy))node.clusterManager = cluster.NewWorkerManager(func(ctx context.Context, nodeID int64) (cluster.Worker, error) {if nodeID == paramtable.GetNodeID() {return NewLocalWorker(node), nil}sessions, _, err := node.session.GetSessions(typeutil.QueryNodeRole)if err != nil {return nil, err}addr := ""for _, session := range sessions {if session.ServerID == nodeID {addr = session.Addressbreak}}client, err := grpcquerynodeclient.NewClient(ctx, addr, nodeID)if err != nil {return nil, err}return cluster.NewRemoteWorker(client), nil})node.delegators = typeutil.NewConcurrentMap[string, delegator.ShardDelegator]()node.subscribingChannels = typeutil.NewConcurrentSet[string]()node.unsubscribingChannels = typeutil.NewConcurrentSet[string]()node.manager = segments.NewManager()node.loader = segments.NewLoader(node.manager, node.vectorStorage)node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, paramtable.GetNodeID())// init pipeline managernode.pipelineManager = pipeline.NewManager(node.manager, node.tSafeManager, node.dispClient, node.delegators)err = node.InitSegcore()if err != nil {log.Error("QueryNode init segcore failed", zap.Error(err))initError = errreturn}if paramtable.Get().QueryNodeCfg.GCEnabled.GetAsBool() {if paramtable.Get().QueryNodeCfg.GCHelperEnabled.GetAsBool() {action := func(GOGC uint32) {debug.SetGCPercent(int(GOGC))}gc.NewTuner(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action)} else {action := func(uint32) {}gc.NewTuner(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action)}}log.Info("query node init successfully",zap.Int64("queryNodeID", paramtable.GetNodeID()),zap.String("Address", node.address),)})return initError
}

从代码可以看出初始化是在填充QueryNode结构体。

s.start()

启动组件的逻辑。

// start starts QueryNode's grpc service.
func (s *Server) start() error {if err := s.querynode.Start(); err != nil {log.Error("QueryNode start failed", zap.Error(err))return err}if err := s.querynode.Register(); err != nil {log.Error("QueryNode register service failed", zap.Error(err))return err}return nil
}

s.querynode是一个Component接口,实现了 方法Init()、 Start() 、 Stop() 、 Register() 。

Register():向元数据etcd注册。

Start():用来启动组件。

// Start mainly start QueryNode's query service.
func (node *QueryNode) Start() error {node.startOnce.Do(func() {node.scheduler.Start()paramtable.SetCreateTime(time.Now())paramtable.SetUpdateTime(time.Now())mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()mmapEnabled := len(mmapDirPath) > 0node.UpdateStateCode(commonpb.StateCode_Healthy)registry.GetInMemoryResolver().RegisterQueryNode(paramtable.GetNodeID(), node)log.Info("query node start successfully",zap.Int64("queryNodeID", paramtable.GetNodeID()),zap.String("Address", node.address),zap.Bool("mmapEnabled", mmapEnabled),)})return nil
}

node节点都没有standby,coord节点有standby。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/824343.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux安装和使用Android Debug Bridge(ADB)

目录 1、开发环境和工具 2、ADB是什么&#xff1f; 3、安装ADB 3.1、使用包管理器安装 ADB 3.2、手动安装 ADB 4、使用ADB 4.1、连接设备 4.2、执行shell命令 4.3、安装应用程序 4.4、截取屏幕截图 4.5、模拟按键和手势 4.6、上传文件到Android设备 4.7、从Android设备下载文件…

BGP边界网关路由实验(华为)

一&#xff0c;技术简介 BGP&#xff08;边界网关路由协议&#xff09;是一种自治系统&#xff08;AS&#xff09;间的协议&#xff0c;主要用于在不同的AS之间交换路由信息。AS是一个由一组网络设备和路由器组成的网络集合&#xff0c;这些设备可以在一个共同的管理域中协同工…

1 回归:锂电池温度预测top2 代码部分(一) Tabnet

2024 iFLYTEK A.I.开发者大赛-讯飞开放平台 TabNet&#xff1a; 模型也是我在这个比赛一个意外收获&#xff0c;这个模型在比赛之中可用。但是需要GPU资源&#xff0c;否则运行真的是太慢了。后面针对这个模型我会写出如何使用的方法策略。 比赛结束后有与其他两位选手聊天&am…

win2022服务器apache配置https(ssl)真实环境实验(避坑之作)不依赖宝塔小皮等集成环境

本次实验背景&#xff1a; 完全参考官方 https://cloud.tencent.com/document/product/400/4143 文档流程&#xff0c;没有搞定&#xff0c;于是写下避坑之作。 服务器&#xff1a;腾讯云轻量应用服务器 操作系统&#xff1a; Windows Server 2022 DataCenter 64bit CN apache…

李沐45_SSD实现——自学笔记

主体思路&#xff1a; 1.生成一堆锚框 2.根据真实标签为每个锚框打标(类别、偏移、mask) 3.模型为每个锚框做一个预测(类别、偏移) 4.计算上述二者的差异损失&#xff0c;以更新模型weights 先读取一张图像。 它的高度和宽度分别为561和728像素。 %matplotlib inline import …

Photoshop 2024 (ps) v25.6中文 强大的图像处理软件 mac/win

Photoshop 2024 for Mac是一款强大的图像处理软件&#xff0c;专为Mac用户设计。它继承了Adobe Photoshop一贯的优秀功能&#xff0c;并进一步提升了性能和稳定性。 Mac版Photoshop 2024 (ps)v25.6中文激活版下载 win版Photoshop 2024 (ps)v25.6直装版下载 无论是专业的设计师还…

EI Scopus双检索 | 2024年清洁能源与智能电网国际会议(CCESG 2024)

会议简介 Brief Introduction 2024年清洁能源与智能电网国际会议(CCESG 2024) 会议时间&#xff1a;2024年 11月27-29日 召开地点&#xff1a;澳大利亚悉尼 大会官网&#xff1a;CCESG 2024-2024 International Joint Conference on Clean Energy and Smart Grid 由CoreShare科…

m4p转换mp3格式怎么转?3个Mac端应用~

M4P文件格式的诞生伴随着苹果公司引入FairPlay版权管理系统&#xff0c;该系统旨在保护音频的内容。M4P因此而生&#xff0c;成为受到FairPlay系统保护的音频格式&#xff0c;常见于苹果设备的iTunes等平台。 MP3文件格式的多个优点 MP3格式的优点显而易见。首先&#xff0c;其…

k8s之etcd

1.特点&#xff1a; etcd 是云原生架构中重要的基础组件。有如下特点&#xff1a; 简单&#xff1a;安装配置简单&#xff0c;而且提供了 HTTP API 进行交互&#xff0c;使用也很简单键值对存储&#xff1a;将数据存储在分层组织的目录中&#xff0c;如同在标准文件系统中监…

vscode msvc qt环境搭建

自己整了好久都没把环境搞好&#xff0c;后来发现已经有大佬搞好了插件&#xff0c;完全不需要自己整理。 下载如下插件&#xff1a; 第二个qt插件就可以自动帮我们生成工程了。 可惜目前似乎支持win&#xff0c;另外就是debug模式运行后会报qwindowsd.dll插件找不到的错误&a…

【简单讲解下如何用爬虫玩转石墨文档】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

链表OJ - 6(链表分割)

题目描述&#xff08;来源&#xff09; 现有一链表的头指针 ListNode* pHead&#xff0c;给一定值x&#xff0c;编写一段代码将所有小于x的结点排在其余结点之前&#xff0c;且不能改变原来的数据顺序&#xff0c;返回重新排列后的链表的头指针。 思路 创建两个链表&#xff0c…

ChatGPT:引领未来的语言模型革命?

一、引言 随着人工智能技术的不断发展&#xff0c;Chat GPT作为一种自然语言处理技术&#xff0c;已经逐渐渗透到各个领域&#xff0c;具有广泛的应用前景。本文将从多个角度探讨Chat GPT的应用领域及其未来发展趋势。 ChatGPT的语言处理能力超越了以往任何一款人工智能产品。…

Docker一键快速私有化部署(Ollama+Openwebui) +AI大模型(gemma,llama2,qwen)20240417更新

几行命令教你私有化部署自己的AI大模型&#xff0c;每个人都可以有自己的GTP 第一步&#xff1a;安装Docker(如果已经有了可以直接跳第二步) ####下载安装Docker wget https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo -O/etc/yum.repos.d/docker-ce.repo##…

STM32 USB虚拟串口

电路原理图 usb部分 晶振部分 usb与单片机连接 配置信息 sys配置信息 rcc配置信息 usb配置信息 虚拟串口配置信息 时钟配置信息 项目配置信息 代码 包含文件 主函数代码 实验效果 修改接收波特率依然可以正常接收&#xff0c;也就是说单片机可以自动适应上位机的波特率设置。…

4.17作业

#include "double_link_list.h" node_p create_double_link_list() //创建双向链表 {node_p H(node_p)malloc(sizeof(node));if(HNULL){printf("空间申请失败\n");return NULL;}H->data0;H->priNULL;H->nextNULL;return H; } node_p create_node…

BUUCTF——[GXYCTF2019]BabyUpload

BUUCTF——[GXYCTF2019]BabyUpload 1.上传嘛&#xff0c;直接丢正常的jpg文件进服务器 2.发现可以正常上传&#xff0c;并且回显出来啦文件上传的路径 /var/www/html/upload/7df22610744ec51e9cb7a8a8eb674374/1111.jpg 3.尝试上传一句话木马 <?php eval($POST[123456]…

HDFS详解(Hadoop)

Hadoop 分布式文件系统&#xff08;Hadoop Distributed File System&#xff0c;HDFS&#xff09;是 Apache Hadoop 生态系统的核心组件之一&#xff0c;它是设计用于存储大规模数据集并运行在廉价硬件上的分布式文件系统。 1. 分布式存储&#xff1a; HDFS 将文件分割成若干块…

「 网络安全常用术语解读 」漏洞利用交换VEX详解

漏洞利用交换&#xff08;Vulnerability Exploitability eXchange&#xff0c;简称VEX&#xff09;是一个信息安全领域的标准&#xff0c;旨在提供关于软件漏洞及其潜在利用的实时信息。根据美国政府发布的用例(PDF)&#xff0c;由美国政府开发的漏洞利用交换(VEX)使供应商和用…

工业电脑在ESOP工作站行业应用

ESOP工作站行业应用 项目背景 E-SOP是实现作业指导书电子化&#xff0c;并统一管理和集中控制的一套管理信息平台。信迈科技的ESOP终端是一款体积小巧功能齐全的高性价比工业电脑&#xff0c;上层通过网络与MES系统连接&#xff0c;下层连接显示器展示作业指导书。ESOP控制终…