HarmonyOS开发者限时福利来啦!最高10w+现金激励等你拿~ 了解详情
写点什么

KubeEdge 源码分析之(四)edgehub

  • 2019-12-17
  • 本文字数:4137 字

    阅读完需:约 14 分钟

KubeEdge源码分析之(四)edgehub

前言


本系列的源码分析是在 commit da92692baa660359bb314d89dfa3a80bffb1d26c 之上进行的。


cloudcore 部分的源码分析是在 kubeedge 源码分析系列之整体架构基础上展开的,如果没有阅读过 kubeedge 源码分析系列之整体架构,直接阅读本文,会感觉比较突兀。


本文对 edgecore 的 edgehub 模块进行剖析,edgehub 作为 edge 部分与 cloud 部分进行交互的门户,有必要将 edgehub 相关内容彻底分析清楚,为使用过程中的故障排查和未来的功能扩展与性能优化都会有很大的帮助。对 edgehub 的剖析,具体包括如下几部分:


  1. edgehub 的 struct 调用链剖析

  2. edgehub 的具体逻辑剖析


edgehub 的 struct 组成剖析


从 edgecore 模块注册函数入手:


kubeedge/edge/cmd/edgecore/app/server.go


// registerModules register all the modules started in edgecore


func registerModules() {


devicetwin.Register()



}


进入 registerModules()函数中的 devicetwin.Register(),具体如下:


kubeedge/edge/pkg/devicetwin/devicetwin.go


// Register register edgehub


func Register() {


core.Register(&EdgeHub{


controller: NewEdgeHubController(),


})


}


顺着 Register()函数中 EdgeHub struct 的实例化语句进入 EdgeHub struct 定义:


kubeedge/edge/pkg/edgehub/module.go


//EdgeHub defines edgehub object structure


type EdgeHub struct {


context *context.Context


controller *Controller


}


EdgeHub struct 中包含*context.Context 和*Controller 两个属性:


  • *context.Context 在 kubeedge 源码分析系列之 cloudcore 中,笔者已经分析过*context.Context,它是一个基于 go-channels 的消息框架,edgecore 用它作为各功能模块之间通信的消息管道。

  • Controller edgehub 的主要功能载体;


进入 Controller struct 的定义:


kubeedge/edge/pkg/edgehub/controller.go


//Controller is EdgeHub controller object


type Controller struct {


context *context.Context


chClient clients.Adapter


config *config.ControllerConfig


stopChan chan struct{}


syncKeeper map[string]chan model.Message


keeperLock sync.RWMutex


}


从 Controller struct 的定义可以确定,Controller struct 是 edgehub 核心功能载体没错了。


到此,edgehub 的 struct 组成剖析就结束了,接下来剖析 edgehub 的具体逻辑。


edgehub 的具体逻辑剖析


回到 edgehub 的注册函数,开始剖析 edgehub 相关的逻辑:


kubeedge/edge/pkg/edgehub/module.go


// Register register edgehub


func Register() {


core.Register(&EdgeHub{


controller: NewEdgeHubController(),


})


}


在 Register()函数中对 EdgeHub struct 的初始化只是对 EdgeHub struct 中的 controller 进行了初始化,进入 controller 的初始化函数:


kubeedge/edge/pkg/edgehub/controller.go


//NewEdgeHubController creates and returns a EdgeHubController object


func NewEdgeHubController() *Controller {


return &Controller{


config: &config.GetConfig().CtrConfig,


stopChan: make(chan struct{}),


syncKeeper: make(map[string]chan model.Message),


}


}


NewEdgeHubController()函数中嵌套了一个获取配置信息的函数调用:


config: &config.GetConfig().CtrConfig


进入 config.GetConfig()函数定义:


kubeedge/edge/pkg/edgehub/config/config.go


var edgeHubConfig EdgeHubConfig



//GetConfig returns the EdgeHub configuration object


func GetConfig() *EdgeHubConfig {


return &edgeHubConfig


}


GetConfig()函数只返回了 &edgeHubConfig,而 edgeHubConfig 是一个 EdgeHubConfig struct 类型的全局变量,至于该变量是在哪里被赋值,怎么赋值的,暂且立个疑问 flag:


EdgeHubConfig 赋值疑问


到此,EdgeHub struct 的初始化就告一段落了,下面分析 EdgeHub 的启动逻辑:


kubeedge/edge/pkg/edgehub/module.go


//Start sets context and starts the controller


func (eh *EdgeHub) Start(c *context.Context) {


eh.context = c


eh.controller.Start©


}


EdgeHub 启动函数 Start(…)只做了 2 件事:


1. 接收并存储传入的消息管道


eh.context = c


2. 启动 EdgeHub 的 controller


eh.controller.Start©


由前面的分析可知 EdgeHub 的 controller 作为 EdgeHub 功能的主要载体,其启动函数也必将囊括 EdgeHub 绝大部分启动逻辑,继续进入 controller 的启动函数定义:


kubeedge/edge/pkg/edgehub/controller.go


//Start will start EdgeHub


func (ehc *Controller) Start(ctx *context.Context) {


config.InitEdgehubConfig()


for {


err := ehc.initial(ctx)



err = ehc.chClient.Init()



// execute hook func after connect


ehc.pubConnectInfo(true)


go ehc.routeToEdge()


go ehc.routeToCloud()


go ehc.keepalive()


// wait the stop singal


// stop authinfo manager/websocket connection


<-ehc.stopChan


ehc.chClient.Uninit()


// execute hook fun after disconnect


ehc.pubConnectInfo(false)


// sleep one period of heartbeat, then try to connect cloud hub again


time.Sleep(ehc.config.HeartbeatPeriod * 2)


// clean channel


clean:


for {


select {


case <-ehc.stopChan:


default:


break clean


}


}


}


}


从 Controller 的启动函数 Start(…)的定义,可以清楚地看到,其中包含了 EdgeHub 的初始化、各种业务 go routine 的启动和最后的退出清理,下面逐个深入剖析:


初始化 EdgehubConfig


config.InitEdgehubConfig()


进入 config.InitEdgehubConfig()函数定义:


kubeedge/edge/pkg/edgehub/config/config.go


// InitEdgehubConfig init edgehub config


func InitEdgehubConfig() {


err := getControllerConfig()



if edgeHubConfig.CtrConfig.Protocol == protocolWebsocket {


err = getWebSocketConfig()



} else if edgeHubConfig.CtrConfig.Protocol == protocolQuic {


err = getQuicConfig()



} else {



}


}


InitEdgehubConfig()函数首先通过 err := getControllerConfig()获得 EdgeHub Controller 的配置信息,然后通过获得的配置信息中的 Protocol 字段来判断是哪个协议,最后根据判断结果获取相应的协议绑定的配置信息或报错。


针对以上获取配置的操作,重点分析获得 EdgeHub Controller 的配置信息,进入 getControllerConfig():


kubeedge/edge/pkg/edgehub/config/config.go


var edgeHubConfig EdgeHubConfig



func getControllerConfig() error {


protocol, err := config.CONFIG.GetValue(“edgehub.controller.protocol”).ToString()



edgeHubConfig.CtrConfig.Protocol = protocol


heartbeat, err := config.CONFIG.GetValue(“edgehub.controller.heartbeat”).ToInt()



edgeHubConfig.CtrConfig.HeartbeatPeriod = time.Duration(heartbeat) * time.Second


projectID, err := config.CONFIG.GetValue(“edgehub.controller.project-id”).ToString()



edgeHubConfig.CtrConfig.ProjectID = projectID


nodeID, err := config.CONFIG.GetValue(“edgehub.controller.node-id”).ToString()



edgeHubConfig.CtrConfig.NodeID = nodeID


return nil


}


getControllerConfig()获取 edgehub.controller.*相关的配置信息并赋值给变量 edgeHubConfig,到此前面立的“EdgeHubConfig 赋值疑问”flag 也得到了解答。


EdgeHub Controller 初始化


err := ehc.initial(ctx)


进入 ehc.initial(…)函数定义:


kubeedge/edge/pkg/edgehub/controller.go


func (ehc *Controller) initial(ctx *context.Context) (err error) {


config.GetConfig().WSConfig.URL, err = bhconfig.CONFIG.GetValue(“edgehub.websocket.url”).ToString()



cloudHubClient, err := clients.GetClient(ehc.config.Protocol, config.GetConfig())



ehc.context = ctx


ehc.chClient = cloudHubClient


return nil


}


  1. 第一行单独获取 edgehub.websocket.url 感觉在前面“初始化 EdgehubConfig“中的 websocket 配置信息初始化部分重复,在此立一个源码问题 flag:

  2. 获取 websocket 配置信息重复

  3. 获取 cloudhub client


cloudHubClient, err := clients.GetClient(ehc.config.Protocol, config.GetConfig())


进入 clients.GetClient(…)定义:


kubeedge/edge/pkg/edgehub/factory.go


//GetClient returns an Adapter object with new web socket


func GetClient(clientType string, config *config.EdgeHubConfig) (Adapter, error) {


switch clientType {


case ClientTypeWebSocket:


websocketConf := wsclient.WebSocketConfig{



}


return wsclient.NewWebSocketClient(&websocketConf), nil


case ClientTypeQuic:


quicConfig := quicclient.QuicConfig{



}


return quicclient.NewQuicClient(&quicConfig), nil


default:


klog.Errorf(“Client type: %s is not supported”, clientType)


}


return nil, ErrorWrongClientType


}


从 GetClient(…)函数定义可以知道,该函数定义了 ClientTypeWebSocket、ClientTypeQuic 两种 client 类型,两者都实现了 Adapter interface,下面遇到 Adapter 类型的 client 变量时,记得对应此处的 ClientTypeWebSocket、ClientTypeQuic。


cloud client 初始化


err = ehc.chClient.Init()


ehc.chClient.Init()函数对应”获取 cloudhub client“中 ClientTypeWebSocket、ClientTypeQuic 的 Init()方法,想要了解 Init()具体做了哪些事情,感兴趣的同学可以在本文的基础上自行剖析。


1. 向 edgecore 各模块广播已经连接成功的消息


// execute hook func after connect


ehc.pubConnectInfo(true)


2. 将从 cloud 部分收到的消息转发给指定 edge 部分的指定模块


go ehc.routeToEdge()


3. 将从 edge 部分的消息转发给 cloud 部分


go ehc.routeToCloud()


4. 向 cloud 部分发送心跳信息


go ehc.keepalive()


5. 剩下的步骤都是在 edgehub 模块退出时的一些清理操作。


到此 edgecore 组件的 edgehub 模块的剖析就结束了,由于篇幅原因,很多思路比较清楚的就没有展开分析,感兴趣的同学可以在本文的基础上自习剖析。


免费直播课


《KubeEdge 技术详解与实战》


今晚 8:00 开播


第 5 课:KubeEdge EdgeMesh 设计原理


直播链接


https://huaweicloud.bugu.mudu.tv/watch/rm2jzlo5


作者 | 之江实验室端边云操作系统团队


原文链接:http://suo.im/6aIHSZ


2019-12-17 14:311960

评论

发布
暂无评论
发现更多内容

不会还有程序员不知道跳槽季靠这1700道java面试题就能平淌大厂吧

程序知音

Java java面试 后端技术 秋招 Java面试题

开发者有话说|一名高中生的编程之路

Loken

个人成长

天猫精灵DIY--技能应用

六月的雨在InfoQ

天猫精灵 功能模型 9月月更 公共实体 语音交互

Python语法之函数

梦笔生花

ESP32-C3 应用 篇(实例一、通过MQTT协议连接ONENET上报传感器数据,云平台下发灯光调色)

矜辰所致

mqtt ESP32-C3 9月月更 项目应用 OneNet

看大神如何用Maya再现神剧《绝命毒师》主角/场景

Renderbus瑞云渲染农场

跟着卷卷龙一起学Camera--内存池浅析03

卷卷龙

ISP 9月月更

【编程实践】手把手带你利用Python简单实现斐波那契数列

迷彩

斐波那契 9月月更 数列

利用Vim和Github Copilot打造一款强大IDE!

Jackpop

Python语法之元组

梦笔生花

Python 元组 9月月更

Spring 注解 @Resource 与 @Autowired 的区别

六月的雨在InfoQ

注解 依赖注入 @Autowired @Resource 9月月更

Python 入门与基础《刷题篇》(1)

吉师职业混子

9月月更

作业

兜里贼缺钱

开源免费!又一款代码文档生成工具!

Jackpop

Python 入门与基础《刷题篇》(2)

吉师职业混子

9月月更

PLG SaaS 案例:如何实践外链自动增长策略?

程序员泥瓦匠

SaaS

云效DevOps--效能无极限

六月的雨在InfoQ

云效 Codeup 云效流水线 Flow 9月月更 Thoughts

赞美,还是责备?全盘否定孩子的一切是“虐待”!

图灵教育

育儿 教育 脑科学

2021年中国知识图谱软件及服务市场规模超100亿,竞争格局不稳定

易观分析

知识图谱

2021年中国机器学习平台软件及服务市场规模近100亿

易观分析

机器学习

com.alibaba.fastjson 对象转json剔除字段

六月的雨在InfoQ

问题处理 Fastjson index Elastic Search 9月月更

bootstrap input框回车后重新刷新页面问题

六月的雨在InfoQ

bootstrap Input 9月月更 表单提交

赞美,还是责备?全盘否定孩子的一切是“虐待”!

图灵社区

育儿 教育 脑科学

<T>和<?>区别

六月的雨在InfoQ

泛型 Java泛型 9月月更 <T> <?>

【云原生 | 从零开始学Kubernetes】十、k8sPod节点亲和性和反亲和性

泡泡

Docker 云计算 云原生 k8s 9月月更

8万Star!这个开源项目有点强!

Jackpop

Python语法之集合

梦笔生花

集合 9月月更 Python语法

The Architecture of Wechat & SMS

David

架构实战营

极客时间架构训练营模块一作业

李晨

架构

9 月 Jina AI 中文社区面对面

Jina AI

人工智能 开源 工程师

python学习之21天(1)

吉师职业混子

9月月更

KubeEdge源码分析之(四)edgehub_云原生_华为云原生团队_InfoQ精选文章