写点什么

KubeEdge 源码分析之(四)edgehub

  • 2019-12-17
  • 本文字数:4137 字

    阅读完需:约 14 分钟

KubeEdge源码分析之(四)edgehub

前言


本系列的源码分析是在 commit da92692baa660359bb314d89dfa3a80bffb1d26c 之上进行的。


cloudcore 部分的源码分析是在 kubeedge 源码分析系列之整体架构基础上展开的,如果没有阅读过 kubeedge 源码分析系列之整体架构,直接阅读本文,会感觉比较突兀。


本文对 edgecore 的 edgehub 模块进行剖析,edgehub 作为 edge 部分与 cloud 部分进行交互的门户,有必要将 edgehub 相关内容彻底分析清楚,为使用过程中的故障排查和未来的功能扩展与性能优化都会有很大的帮助。对 edgehub 的剖析,具体包括如下几部分:


  1. edgehub 的 struct 调用链剖析

  2. edgehub 的具体逻辑剖析


edgehub 的 struct 组成剖析


从 edgecore 模块注册函数入手:


kubeedge/edge/cmd/edgecore/app/server.go


// registerModules register all the modules started in edgecore


func registerModules() {


devicetwin.Register()



}


进入 registerModules()函数中的 devicetwin.Register(),具体如下:


kubeedge/edge/pkg/devicetwin/devicetwin.go


// Register register edgehub


func Register() {


core.Register(&EdgeHub{


controller: NewEdgeHubController(),


})


}


顺着 Register()函数中 EdgeHub struct 的实例化语句进入 EdgeHub struct 定义:


kubeedge/edge/pkg/edgehub/module.go


//EdgeHub defines edgehub object structure


type EdgeHub struct {


context *context.Context


controller *Controller


}


EdgeHub struct 中包含*context.Context 和*Controller 两个属性:


  • *context.Context 在 kubeedge 源码分析系列之 cloudcore 中,笔者已经分析过*context.Context,它是一个基于 go-channels 的消息框架,edgecore 用它作为各功能模块之间通信的消息管道。

  • Controller edgehub 的主要功能载体;


进入 Controller struct 的定义:


kubeedge/edge/pkg/edgehub/controller.go


//Controller is EdgeHub controller object


type Controller struct {


context *context.Context


chClient clients.Adapter


config *config.ControllerConfig


stopChan chan struct{}


syncKeeper map[string]chan model.Message


keeperLock sync.RWMutex


}


从 Controller struct 的定义可以确定,Controller struct 是 edgehub 核心功能载体没错了。


到此,edgehub 的 struct 组成剖析就结束了,接下来剖析 edgehub 的具体逻辑。


edgehub 的具体逻辑剖析


回到 edgehub 的注册函数,开始剖析 edgehub 相关的逻辑:


kubeedge/edge/pkg/edgehub/module.go


// Register register edgehub


func Register() {


core.Register(&EdgeHub{


controller: NewEdgeHubController(),


})


}


在 Register()函数中对 EdgeHub struct 的初始化只是对 EdgeHub struct 中的 controller 进行了初始化,进入 controller 的初始化函数:


kubeedge/edge/pkg/edgehub/controller.go


//NewEdgeHubController creates and returns a EdgeHubController object


func NewEdgeHubController() *Controller {


return &Controller{


config: &config.GetConfig().CtrConfig,


stopChan: make(chan struct{}),


syncKeeper: make(map[string]chan model.Message),


}


}


NewEdgeHubController()函数中嵌套了一个获取配置信息的函数调用:


config: &config.GetConfig().CtrConfig


进入 config.GetConfig()函数定义:


kubeedge/edge/pkg/edgehub/config/config.go


var edgeHubConfig EdgeHubConfig



//GetConfig returns the EdgeHub configuration object


func GetConfig() *EdgeHubConfig {


return &edgeHubConfig


}


GetConfig()函数只返回了 &edgeHubConfig,而 edgeHubConfig 是一个 EdgeHubConfig struct 类型的全局变量,至于该变量是在哪里被赋值,怎么赋值的,暂且立个疑问 flag:


EdgeHubConfig 赋值疑问


到此,EdgeHub struct 的初始化就告一段落了,下面分析 EdgeHub 的启动逻辑:


kubeedge/edge/pkg/edgehub/module.go


//Start sets context and starts the controller


func (eh *EdgeHub) Start(c *context.Context) {


eh.context = c


eh.controller.Start©


}


EdgeHub 启动函数 Start(…)只做了 2 件事:


1. 接收并存储传入的消息管道


eh.context = c


2. 启动 EdgeHub 的 controller


eh.controller.Start©


由前面的分析可知 EdgeHub 的 controller 作为 EdgeHub 功能的主要载体,其启动函数也必将囊括 EdgeHub 绝大部分启动逻辑,继续进入 controller 的启动函数定义:


kubeedge/edge/pkg/edgehub/controller.go


//Start will start EdgeHub


func (ehc *Controller) Start(ctx *context.Context) {


config.InitEdgehubConfig()


for {


err := ehc.initial(ctx)



err = ehc.chClient.Init()



// execute hook func after connect


ehc.pubConnectInfo(true)


go ehc.routeToEdge()


go ehc.routeToCloud()


go ehc.keepalive()


// wait the stop singal


// stop authinfo manager/websocket connection


<-ehc.stopChan


ehc.chClient.Uninit()


// execute hook fun after disconnect


ehc.pubConnectInfo(false)


// sleep one period of heartbeat, then try to connect cloud hub again


time.Sleep(ehc.config.HeartbeatPeriod * 2)


// clean channel


clean:


for {


select {


case <-ehc.stopChan:


default:


break clean


}


}


}


}


从 Controller 的启动函数 Start(…)的定义,可以清楚地看到,其中包含了 EdgeHub 的初始化、各种业务 go routine 的启动和最后的退出清理,下面逐个深入剖析:


初始化 EdgehubConfig


config.InitEdgehubConfig()


进入 config.InitEdgehubConfig()函数定义:


kubeedge/edge/pkg/edgehub/config/config.go


// InitEdgehubConfig init edgehub config


func InitEdgehubConfig() {


err := getControllerConfig()



if edgeHubConfig.CtrConfig.Protocol == protocolWebsocket {


err = getWebSocketConfig()



} else if edgeHubConfig.CtrConfig.Protocol == protocolQuic {


err = getQuicConfig()



} else {



}


}


InitEdgehubConfig()函数首先通过 err := getControllerConfig()获得 EdgeHub Controller 的配置信息,然后通过获得的配置信息中的 Protocol 字段来判断是哪个协议,最后根据判断结果获取相应的协议绑定的配置信息或报错。


针对以上获取配置的操作,重点分析获得 EdgeHub Controller 的配置信息,进入 getControllerConfig():


kubeedge/edge/pkg/edgehub/config/config.go


var edgeHubConfig EdgeHubConfig



func getControllerConfig() error {


protocol, err := config.CONFIG.GetValue(“edgehub.controller.protocol”).ToString()



edgeHubConfig.CtrConfig.Protocol = protocol


heartbeat, err := config.CONFIG.GetValue(“edgehub.controller.heartbeat”).ToInt()



edgeHubConfig.CtrConfig.HeartbeatPeriod = time.Duration(heartbeat) * time.Second


projectID, err := config.CONFIG.GetValue(“edgehub.controller.project-id”).ToString()



edgeHubConfig.CtrConfig.ProjectID = projectID


nodeID, err := config.CONFIG.GetValue(“edgehub.controller.node-id”).ToString()



edgeHubConfig.CtrConfig.NodeID = nodeID


return nil


}


getControllerConfig()获取 edgehub.controller.*相关的配置信息并赋值给变量 edgeHubConfig,到此前面立的“EdgeHubConfig 赋值疑问”flag 也得到了解答。


EdgeHub Controller 初始化


err := ehc.initial(ctx)


进入 ehc.initial(…)函数定义:


kubeedge/edge/pkg/edgehub/controller.go


func (ehc *Controller) initial(ctx *context.Context) (err error) {


config.GetConfig().WSConfig.URL, err = bhconfig.CONFIG.GetValue(“edgehub.websocket.url”).ToString()



cloudHubClient, err := clients.GetClient(ehc.config.Protocol, config.GetConfig())



ehc.context = ctx


ehc.chClient = cloudHubClient


return nil


}


  1. 第一行单独获取 edgehub.websocket.url 感觉在前面“初始化 EdgehubConfig“中的 websocket 配置信息初始化部分重复,在此立一个源码问题 flag:

  2. 获取 websocket 配置信息重复

  3. 获取 cloudhub client


cloudHubClient, err := clients.GetClient(ehc.config.Protocol, config.GetConfig())


进入 clients.GetClient(…)定义:


kubeedge/edge/pkg/edgehub/factory.go


//GetClient returns an Adapter object with new web socket


func GetClient(clientType string, config *config.EdgeHubConfig) (Adapter, error) {


switch clientType {


case ClientTypeWebSocket:


websocketConf := wsclient.WebSocketConfig{



}


return wsclient.NewWebSocketClient(&websocketConf), nil


case ClientTypeQuic:


quicConfig := quicclient.QuicConfig{



}


return quicclient.NewQuicClient(&quicConfig), nil


default:


klog.Errorf(“Client type: %s is not supported”, clientType)


}


return nil, ErrorWrongClientType


}


从 GetClient(…)函数定义可以知道,该函数定义了 ClientTypeWebSocket、ClientTypeQuic 两种 client 类型,两者都实现了 Adapter interface,下面遇到 Adapter 类型的 client 变量时,记得对应此处的 ClientTypeWebSocket、ClientTypeQuic。


cloud client 初始化


err = ehc.chClient.Init()


ehc.chClient.Init()函数对应”获取 cloudhub client“中 ClientTypeWebSocket、ClientTypeQuic 的 Init()方法,想要了解 Init()具体做了哪些事情,感兴趣的同学可以在本文的基础上自行剖析。


1. 向 edgecore 各模块广播已经连接成功的消息


// execute hook func after connect


ehc.pubConnectInfo(true)


2. 将从 cloud 部分收到的消息转发给指定 edge 部分的指定模块


go ehc.routeToEdge()


3. 将从 edge 部分的消息转发给 cloud 部分


go ehc.routeToCloud()


4. 向 cloud 部分发送心跳信息


go ehc.keepalive()


5. 剩下的步骤都是在 edgehub 模块退出时的一些清理操作。


到此 edgecore 组件的 edgehub 模块的剖析就结束了,由于篇幅原因,很多思路比较清楚的就没有展开分析,感兴趣的同学可以在本文的基础上自习剖析。


免费直播课


《KubeEdge 技术详解与实战》


今晚 8:00 开播


第 5 课:KubeEdge EdgeMesh 设计原理


直播链接


https://huaweicloud.bugu.mudu.tv/watch/rm2jzlo5


作者 | 之江实验室端边云操作系统团队


原文链接:http://suo.im/6aIHSZ


2019-12-17 14:312039

评论

发布
暂无评论
发现更多内容

SpringMVC还是Spring WebFlux?谁是下一代的Java程序员技术栈?

程序员小毕

Java spring 程序员 后端 springmvc

数字先锋| 药品信息何处有?尽在标识编码处

天翼云开发者社区

重磅 | 九科信息成功入选中国交通建设集团财务云(RPA)项目

九科Ninetech

PM&PMO汇报工作的5大技巧,学会了让老板眼前一亮!

PMO实践

PMO 年终报告 年终总结 项目经理

天翼云斩获2022全球分布式云大会两项大奖

天翼云开发者社区

案例 | 九科信息久其报表RPA项目助力某大型央企财务部门提质增效

九科Ninetech

功能上新|使用 Excel 低门槛进行指标分析!

Kyligence

数据分析 指标管理

作为一个研发凭什么花大量时间修安全漏洞?

墨菲安全

安全 开发

华为云发布CodeArts Req需求管理工具,让需求管理化繁为简

IT科技苏辞

站在2023起跑线,政企数字化如何深入“核心地带”?

脑极体

云渲染时可以关机吗_云渲染电脑可以关闭吗?

Renderbus瑞云渲染农场

云渲染

HTTP其他首部字段

穿过生命散发芬芳

HTTP 12月月更

32篇年度最佳AI论文;Python编译器Codon开源;ChatGPT的前世今生

OneFlow

人工智能 深度学习 大模型

MySQL分库分表,可能真的要退出历史舞台了!

Java永远的神

MySQL 程序员 后端 架构师 分布分表

架构实战营第 10 期 - 模块三作业:外包学生管理系统详细架构设计文档

kaizen

「架构实战营」

TSDB在油气田勘探开发领域的应用

CnosDB

IoT 时序数据库 开源社区 CnosDB infra

直播倒计时1天!“基于AIOps的全面可观测性网络研讨会”与你不见不散

博睿数据

根因分析 直播 智能运维 博睿数据

华为时习知,让企业培训更简单!

IT科技苏辞

5分钟搞懂Jenkins分布式架构

俞凡

架构 DevOps cicd 最佳实践

2022年中国第三方输入法发展分析

易观分析

报告 输入法 语音输入

安全可信| 安全与高效兼得?天翼云EasyCoding敏捷开发平台来了!

天翼云开发者社区

Syscoin宣布与Web3孵化器WEconomy建立长期合作伙伴关系

100DAO 加速计划

DAO #Syscoin #区块链 #Web3

软件测试面试真题 | web自动化关闭浏览器,quit()和close()的区别

测试人

软件测试 面试题 自动化测试 测试开发 web测试

阿里云对话 Tapdata:以秒级响应速度,为企业提供实时数据服务

tapdata

ETL 实时数据 DaaS 现代数据栈

【从零开始学爬虫】采集全国高校导师数据

前嗅大数据

数据采集 爬虫教程 爬虫入门

新茶饮 200+ 门店优化库存成本,需要几个数据分析师?

Kyligence

数据分析 指标中台

软件测试面试真题 | 需求评审中从几个方面发现问题

测试人

软件测试 面试题 自动化测试 测试开发 需求评审

案例 | 九科信息为某大型科研单位设计财务系统科目预警RPA

九科Ninetech

外包学生管理系统架构文档

白杨

数字先锋| 活起来、动起来、用起来!“海南模式”让数据要素发挥更大价值

天翼云开发者社区

小数据治理靠“人工”,大数据治理靠“智能”

用友BIP

KubeEdge源码分析之(四)edgehub_云原生_华为云原生团队_InfoQ精选文章