Kubeedge:edgecore源码速读

Kubeedge源码版本:v1.15.1

首先,我们从edgehub的start函数看起:

它主要干几件事情:

  1. 初始化证书相关,这里的证书主要用于webskt的连接
  2. 启动edgehub,开启三个协程,分别把云发过来的消息路由到边缘的组件、把边缘组件发过来的消息路由到云、向云上做保活。
func (eh *EdgeHub) Start() {eh.certManager = certificate.NewCertManager(config.Config.EdgeHub, config.Config.NodeName)eh.certManager.Start()for _, v := range GetCertSyncChannel() {v <- trueclose(v)}go eh.ifRotationDone() // 一个用于证书轮换的同步的goroutine,更新完证书之后就重启。for {select { // 用于停掉edgehubcase <-beehiveContext.Done():klog.Warning("EdgeHub stop")returndefault:} // select doneerr := eh.initial() // 加载一些配置到Edgehub结构体的eh.chClient项中if err != nil {klog.Exitf("failed to init controller: %v", err)return}waitTime := time.Duration(config.Config.Heartbeat) * time.Second * 2err = eh.chClient.Init() // 基于前面加载的eh.chClient项建立webskt/quic连接if err != nil {time.Sleep(waitTime) // 如果失败就歇一会再重试continue}// execute hook func after connecteh.pubConnectInfo(true) // 向所有的边缘组件发送云边通道建立的消息go eh.routeToEdge() // 接收 云 发到 边 的所有消息,然后dispatchgo eh.routeToCloud() // 接收 边 发到 云 的所有消息,然后试图发送go eh.keepalive() // 用于向 云 发送保活信号// wait the stop signal// stop authinfo manager/websocket connection<-eh.reconnectChan // 接收重连的信号eh.chClient.UnInit() // 关闭连接// execute hook fun after disconnecteh.pubConnectInfo(false) // 向所有的边缘组件发送云边通道断开的消息// sleep one period of heartbeat, then try to connect cloud hub againklog.Warningf("connection is broken, will reconnect after %s", waitTime.String())time.Sleep(waitTime)time.Sleep(300 * time.Second)// clean channelclean:for {select {case <-eh.reconnectChan:default:break clean}}} // end for
}

然后主要查看这几个函数:

go eh.routeToEdge() // 接收 云 发到 边 的所有消息,然后给handler进行处理
go eh.routeToCloud() // 接收 边 发到 云 的所有消息,然后试图发送
go eh.keepalive() // 用于向 云 发送保活信号
首先查看go eh.routeToEdge()方法

这是一个死循环,不断地从云端获取消息(如果暂时没有消息就会被阻塞掉),拿到消息之后就会被eh.dispatch(message)方法进行处理。逻辑如下:

func (eh *EdgeHub) routeToEdge() {for {select {case <-beehiveContext.Done():klog.Warning("EdgeHub RouteToEdge stop")returndefault:} // end selectmessage, err := eh.chClient.Receive() // 接收webskt传来的云端消息if err != nil {klog.Errorf("websocket read error: %v", err)eh.reconnectChan <- struct{}{}return}klog.V(4).Infof("[edgehub/routeToEdge] receive msg from cloud, msg:% +v", message)err = eh.dispatch(message) // 进行处理if err != nil {klog.Errorf("failed to dispatch message, discard: %v", err)}}
}

相应地,我们看一下dispatch函数的逻辑,它包装了一个handler,这个handler会对从云端发往edgehub的消息进行处理。

func (eh *EdgeHub) dispatch(message model.Message) error {// handler for msg.err := msghandler.ProcessHandler(message, eh.chClient)if err != nil {return err}return nil
}

继续看一下ProcessHandler,对于从云端发往edgehub的消息,会先过滤再处理:

func ProcessHandler(message model.Message, client clients.Adapter) error {lock.RLock()defer lock.RUnlock()for _, handle := range Handlers {if handle.Filter(&message) {err := handle.Process(&message, client)if err != nil {return fmt.Errorf("...")}return nil}}return fmt.Errorf("...")
}

其中,这里面的Filter函数说是过滤一些东西,但是个人目前来看就是什么东西都不过滤(但是,不排除在后来的版本中添加更精细的过滤逻辑)

func (*defaultHandler) Filter(message *model.Message) bool {group := message.GetGroup()return group == messagepkg.ResourceGroupName || group == messagepkg.TwinGroupName ||group == messagepkg.FuncGroupName || group == messagepkg.UserGroupName // 基本上就是返回true了
}

在过滤完云端发到edgehub的消息之后,edgehub会在process函数里对消息进行处理,主要就是用beeive做一下消息转发操作。

func (*defaultHandler) Process(message *model.Message, clientHub clients.Adapter) error {group := message.GetGroup()md := ""switch group {case messagepkg.ResourceGroupName:md = modules.MetaGroupcase messagepkg.TwinGroupName:md = modules.TwinGroupcase messagepkg.FuncGroupName:md = modules.MetaGroupcase messagepkg.UserGroupName:md = modules.BusGroup}// TODO: just for a temporary fix.// The code related to device twin message transmission will be reconstructed//  by using sendSync function instead of send function.if group == messagepkg.TwinGroupName {beehiveContext.SendToGroup(md, *message)return nil}isResponse := isSyncResponse(message.GetParentID())if isResponse {beehiveContext.SendResp(*message)return nil}if group == messagepkg.UserGroupName && message.GetSource() == "router_eventbus" {beehiveContext.Send(modules.EventBusModuleName, *message)} else if group == messagepkg.UserGroupName && message.GetSource() == "router_servicebus" {beehiveContext.Send(modules.ServiceBusModuleName, *message)} else {beehiveContext.SendToGroup(md, *message)}return nil
}
查看go eh.routeToCloud()

eh.routeToCloud()函数主要是从边缘的各个组件拿到beehive格式的消息,然后走webskt/quic发送到cloudhub。

使用一个死循环不断地拿beehive中的存储的消息。考虑到消息的数量可能较多,这里对消息处理进行了限速。

func (eh *EdgeHub) routeToCloud() {for {select {case <-beehiveContext.Done():klog.Warning("EdgeHub RouteToCloud stop")returndefault:} // end selectmessage, err := beehiveContext.Receive(modules.EdgeHubModuleName)if err != nil {klog.Errorf("failed to receive message from edge: %v", err)time.Sleep(time.Second)continue}err = eh.tryThrottle(message.GetID()) // 用来做流量速率控制的函数if err != nil {klog.Errorf("msgID: %s, client rate limiter returned an error: %v ", message.GetID(), err)continue}// post message to cloud huberr = eh.sendToCloud(message) // 发送到云上而已if err != nil {klog.Errorf("failed to send message to cloud: %v", err)eh.reconnectChan <- struct{}{}return}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/2889.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

蝴蝶书--ChatGPT基础科普

temperature的参数调整输出的概率分布&#xff0c;这个参数值越大&#xff0c;分布就看起来越平滑&#xff0c;也就是高概率和低概率的差距拉小了&#xff08;对输出不那么确定&#xff09; Top-P在累计概率超过P的词里进行选择&#xff0c;对于概率分布比较均匀的情况&#x…

视频教程下载:用ChatGPT的 API 开发AI应用指南

通过这门关于 OpenAI API 和 ChatGPT API 的全面课程&#xff0c;在您的应用中释放人工智能的力量。随着人工智能技术的快速发展&#xff0c;比以往任何时候都更重要的是保持领先地位&#xff0c;并为您的项目利用这些尖端工具。在本课程中&#xff0c;您将深入了解人工智能驱动…

ADB 命令大全

Case1&#xff1a;报错Remote couldnt create file&#xff1a;Read-only file system 输入 adb disable-verity adb reboot adb root adb remount Case2&#xff1a;/system/bin/sh: cant create C:xxx.txt: Read-only file system Android设备的文件系统是基于Linux的&…

物联网硬件设计开发全攻略:十大关键阶段深度解析

为物联网应用设计开发高效稳定的硬件系统本身是一项既复杂又精细的艰巨任务。看似小巧的物联网设备一般由软件、固件和硬件组件组成&#xff0c;其中&#xff0c;硬件组件更是占据了约80%的成本与开发挑战。那么&#xff0c;为何硬件部分如此棘手&#xff1f;在这篇文章中&…

x汽车登陆网站登陆rsa加密逆向

声明&#xff1a; 本文章内容仅供学习交流&#xff0c;不用于其他其他任何目的&#xff0c;严禁用于商业用途和非法用途&#xff0c;否则由此产生的一切后果均与作者无关&#xff0c; 各位看官好哇&#xff0c;今天给大家带来一篇web自动化逆向的文章&#xff0c;如下图当前我…

C++ 几句话彻底点通虚表

#include <iostream>using namespace std;class Base { public:virtual void show() // 声明虚函数{cout << "Base" << endl;} };class Derived : public Base { public:void show() override // 覆盖虚函数{cout << "Derived" &l…

芯科科技大大简化面向无电池物联网的能量采集产品的开发

芯科科技推出其迄今最高能量效率且支持能量采集功能的无线SoC 中国&#xff0c;北京 – 2024年4月22日 – 致力于以安全、智能无线连接技术&#xff0c;建立更互联世界的全球领导厂商Silicon Labs&#xff08;亦称“芯科科技”&#xff0c;NASDAQ&#xff1a;SLAB&#xff09;…

Day1: 5道C++ 面向对象高频题整理

1、什么是类&#xff1f; 在C中&#xff0c;类是一种用户定义的数据类型&#xff0c;它可以包含数据成员和函数成员。数据成员用于存储与类相关的状态&#xff0c;而函数成员可以定义对这些数据进行操作的方法。可以把类想象为一个蓝图&#xff0c;根据这个蓝图可以创建对象&am…

超星图书转成PDF格式

转为pdf 为避免浪费您的时间&#xff0c;本篇转载文章不值得花费您的宝贵时间阅读 方法一 感谢医学插画动画杜鹏 Roison An两位提供的方法&#xff0c;经试验后简化了一下&#xff0c;得出以下方法:1、使用超星打开你想要转换的图书2、依次打开本书的所有页面&#xff0c;不要…

Property ‘auth‘ does not exist on type ‘AGCApi‘.

Property ‘auth’ does not exist on type ‘AGCApi’. 解决 清理项目重新运行模拟器就可以了

程序员开发必备,开发资源资料分享【4】

第4部分内容 130-100051801-专栏课-罗剑锋-罗剑锋的 C实战笔记&#xff08;完结&#xff09;提取码&#xff1a; 131-100051901-专栏课-陈亦峰-互联网人的英语私教课&#xff08;完结&#xff09;提取码&#xff1a; 132-100051101-视频课-程超-分布式缓存高手课&#xff08…

CentOS-7安装clickhouse并允许其他主机登录

一、通用设置 1、配置主机名 hostnamectl set-hostname --static 主机名2、修改hosts文件 vim /etc/hosts 输入&#xff1a; 192.168.15.129 master 192.168.15.133 node1 192.168.15.134 node2 192.168.15.136 node33、 保持服务器之间时间同步 yum install -y ntpdate &…

Java | Leetcode Java题解之第42题接雨水

题目&#xff1a; 题解&#xff1a; class Solution {public int trap(int[] height) {int n height.length;if (n 0) {return 0;}int[] leftMax new int[n];leftMax[0] height[0];for (int i 1; i < n; i) {leftMax[i] Math.max(leftMax[i - 1], height[i]);}int[] …

用了Keras来构建一个卷积神经网络对MNIST数据集进行分类

首先导入所需库和模块&#xff0c;然后加载MNIST手写数字数据集并对数据进行预处理。接着定义了一个基于卷积神经网络&#xff08;CNN&#xff09;的模型&#xff0c;该模型包含多个卷积层、最大池化层以及Dropout层作为正则化手段。模型最后接一个全连接层作为输出层&#xff…

云南旅游攻略

丽江景点 Day1 ——丽江古城 丽江古城是一个充满文化和历史的地方&#xff0c;拥有丰富的景点和活动。 推荐游玩&#xff1a; 参观标志性建筑&#xff1a;大水车是丽江古城的标志性建筑&#xff0c;可以在这里拍照留念。 探索中心广场&#xff1a;四方街是古城的中心&#xf…

【第6节】Lagent AgentLego 智能体应用搭建

目录 1 基础课程2 安装环境2.1 教程要求2.2 安装 Lagent 和 AgentLego 3 实践操作3.1 Lagent&#xff1a;轻量级智能体框架3.1.1 Lagent Web Demo 使用3.1.2 用 Lagent 自定义工具 3.2 AgentLego&#xff1a;组装智能体“乐高”3.2.1 AgentLego 直接使用部分3.2.2 AgentLego We…

C++笔记:类和对象(一)->封装

类和对象 认识类和对象 先来回忆一下C语言中的类型和变量&#xff0c;类型就像是定义了数据的规则&#xff0c;而变量则是根据这些规则来实际存储数据的容器。类是我们自己定义的一种数据类型&#xff0c;而对象则是这种数据类型的一个具体实例。类就可以理解为类型&#xff0c…

从图灵奖看计算中的随机性与伪随机性

从图灵奖看计算中的随机性与伪随机性 目录 从图灵奖看计算中的随机性与伪随机性 一、引言 二、随机性的本质与应用 三、图灵奖得主对随机性的研究 四、伪随机性的应用 五、案例研究&#xff1a;伪随机数生成器的发展 六、最佳实践 一、引言 在计算机科学的广阔天地中&…

Android Native Hook: 原理、方案对比与具体实现

文章目录 一、原理二、方案对比三、具体实现3.1 Inline Hook3.2 PLT/GOT Hook 四、实践案例&#xff1a;在Android应用中Hook open 函数4.1 Inline Hook实现4.2 PLT/GOT Hook实现 五、实践技巧和优化建议六、总结 在Android开发中&#xff0c;Hook技术是一种常用的技巧&#xf…

二维码存储图片如何实现?相册二维码的制作技巧

如何将照片生成二维码后存储展示&#xff1f;现在很多人会将图片生成二维码以后&#xff0c;用于分享或者储存的用途&#xff0c;减少个人内存的占用量&#xff0c;而且分享照片也会更加的方便&#xff0c;只需要扫描二维码就可以让其他人查看图片。 想要制作图片二维码的步骤…