写点什么

KubeEdge 源码分析之(四)edgehub

  • 2019-12-17
  • 本文字数:4137 字

    阅读完需:约 14 分钟

KubeEdge源码分析之(四)edgehub

前言


本系列的源码分析是在 commit da92692baa660359bb314d89dfa3a80bffb1d26c 之上进行的。


cloudcore 部分的源码分析是在 kubeedge 源码分析系列之整体架构基础上展开的,如果没有阅读过 kubeedge 源码分析系列之整体架构,直接阅读本文,会感觉比较突兀。


本文对 edgecore 的 edgehub 模块进行剖析,edgehub 作为 edge 部分与 cloud 部分进行交互的门户,有必要将 edgehub 相关内容彻底分析清楚,为使用过程中的故障排查和未来的功能扩展与性能优化都会有很大的帮助。对 edgehub 的剖析,具体包括如下几部分:


  1. edgehub 的 struct 调用链剖析

  2. edgehub 的具体逻辑剖析


edgehub 的 struct 组成剖析


从 edgecore 模块注册函数入手:


kubeedge/edge/cmd/edgecore/app/server.go


// registerModules register all the modules started in edgecore


func registerModules() {


devicetwin.Register()



}


进入 registerModules()函数中的 devicetwin.Register(),具体如下:


kubeedge/edge/pkg/devicetwin/devicetwin.go


// Register register edgehub


func Register() {


core.Register(&EdgeHub{


controller: NewEdgeHubController(),


})


}


顺着 Register()函数中 EdgeHub struct 的实例化语句进入 EdgeHub struct 定义:


kubeedge/edge/pkg/edgehub/module.go


//EdgeHub defines edgehub object structure


type EdgeHub struct {


context *context.Context


controller *Controller


}


EdgeHub struct 中包含*context.Context 和*Controller 两个属性:


  • *context.Context 在 kubeedge 源码分析系列之 cloudcore 中,笔者已经分析过*context.Context,它是一个基于 go-channels 的消息框架,edgecore 用它作为各功能模块之间通信的消息管道。

  • Controller edgehub 的主要功能载体;


进入 Controller struct 的定义:


kubeedge/edge/pkg/edgehub/controller.go


//Controller is EdgeHub controller object


type Controller struct {


context *context.Context


chClient clients.Adapter


config *config.ControllerConfig


stopChan chan struct{}


syncKeeper map[string]chan model.Message


keeperLock sync.RWMutex


}


从 Controller struct 的定义可以确定,Controller struct 是 edgehub 核心功能载体没错了。


到此,edgehub 的 struct 组成剖析就结束了,接下来剖析 edgehub 的具体逻辑。


edgehub 的具体逻辑剖析


回到 edgehub 的注册函数,开始剖析 edgehub 相关的逻辑:


kubeedge/edge/pkg/edgehub/module.go


// Register register edgehub


func Register() {


core.Register(&EdgeHub{


controller: NewEdgeHubController(),


})


}


在 Register()函数中对 EdgeHub struct 的初始化只是对 EdgeHub struct 中的 controller 进行了初始化,进入 controller 的初始化函数:


kubeedge/edge/pkg/edgehub/controller.go


//NewEdgeHubController creates and returns a EdgeHubController object


func NewEdgeHubController() *Controller {


return &Controller{


config: &config.GetConfig().CtrConfig,


stopChan: make(chan struct{}),


syncKeeper: make(map[string]chan model.Message),


}


}


NewEdgeHubController()函数中嵌套了一个获取配置信息的函数调用:


config: &config.GetConfig().CtrConfig


进入 config.GetConfig()函数定义:


kubeedge/edge/pkg/edgehub/config/config.go


var edgeHubConfig EdgeHubConfig



//GetConfig returns the EdgeHub configuration object


func GetConfig() *EdgeHubConfig {


return &edgeHubConfig


}


GetConfig()函数只返回了 &edgeHubConfig,而 edgeHubConfig 是一个 EdgeHubConfig struct 类型的全局变量,至于该变量是在哪里被赋值,怎么赋值的,暂且立个疑问 flag:


EdgeHubConfig 赋值疑问


到此,EdgeHub struct 的初始化就告一段落了,下面分析 EdgeHub 的启动逻辑:


kubeedge/edge/pkg/edgehub/module.go


//Start sets context and starts the controller


func (eh *EdgeHub) Start(c *context.Context) {


eh.context = c


eh.controller.Start©


}


EdgeHub 启动函数 Start(…)只做了 2 件事:


1. 接收并存储传入的消息管道


eh.context = c


2. 启动 EdgeHub 的 controller


eh.controller.Start©


由前面的分析可知 EdgeHub 的 controller 作为 EdgeHub 功能的主要载体,其启动函数也必将囊括 EdgeHub 绝大部分启动逻辑,继续进入 controller 的启动函数定义:


kubeedge/edge/pkg/edgehub/controller.go


//Start will start EdgeHub


func (ehc *Controller) Start(ctx *context.Context) {


config.InitEdgehubConfig()


for {


err := ehc.initial(ctx)



err = ehc.chClient.Init()



// execute hook func after connect


ehc.pubConnectInfo(true)


go ehc.routeToEdge()


go ehc.routeToCloud()


go ehc.keepalive()


// wait the stop singal


// stop authinfo manager/websocket connection


<-ehc.stopChan


ehc.chClient.Uninit()


// execute hook fun after disconnect


ehc.pubConnectInfo(false)


// sleep one period of heartbeat, then try to connect cloud hub again


time.Sleep(ehc.config.HeartbeatPeriod * 2)


// clean channel


clean:


for {


select {


case <-ehc.stopChan:


default:


break clean


}


}


}


}


从 Controller 的启动函数 Start(…)的定义,可以清楚地看到,其中包含了 EdgeHub 的初始化、各种业务 go routine 的启动和最后的退出清理,下面逐个深入剖析:


初始化 EdgehubConfig


config.InitEdgehubConfig()


进入 config.InitEdgehubConfig()函数定义:


kubeedge/edge/pkg/edgehub/config/config.go


// InitEdgehubConfig init edgehub config


func InitEdgehubConfig() {


err := getControllerConfig()



if edgeHubConfig.CtrConfig.Protocol == protocolWebsocket {


err = getWebSocketConfig()



} else if edgeHubConfig.CtrConfig.Protocol == protocolQuic {


err = getQuicConfig()



} else {



}


}


InitEdgehubConfig()函数首先通过 err := getControllerConfig()获得 EdgeHub Controller 的配置信息,然后通过获得的配置信息中的 Protocol 字段来判断是哪个协议,最后根据判断结果获取相应的协议绑定的配置信息或报错。


针对以上获取配置的操作,重点分析获得 EdgeHub Controller 的配置信息,进入 getControllerConfig():


kubeedge/edge/pkg/edgehub/config/config.go


var edgeHubConfig EdgeHubConfig



func getControllerConfig() error {


protocol, err := config.CONFIG.GetValue(“edgehub.controller.protocol”).ToString()



edgeHubConfig.CtrConfig.Protocol = protocol


heartbeat, err := config.CONFIG.GetValue(“edgehub.controller.heartbeat”).ToInt()



edgeHubConfig.CtrConfig.HeartbeatPeriod = time.Duration(heartbeat) * time.Second


projectID, err := config.CONFIG.GetValue(“edgehub.controller.project-id”).ToString()



edgeHubConfig.CtrConfig.ProjectID = projectID


nodeID, err := config.CONFIG.GetValue(“edgehub.controller.node-id”).ToString()



edgeHubConfig.CtrConfig.NodeID = nodeID


return nil


}


getControllerConfig()获取 edgehub.controller.*相关的配置信息并赋值给变量 edgeHubConfig,到此前面立的“EdgeHubConfig 赋值疑问”flag 也得到了解答。


EdgeHub Controller 初始化


err := ehc.initial(ctx)


进入 ehc.initial(…)函数定义:


kubeedge/edge/pkg/edgehub/controller.go


func (ehc *Controller) initial(ctx *context.Context) (err error) {


config.GetConfig().WSConfig.URL, err = bhconfig.CONFIG.GetValue(“edgehub.websocket.url”).ToString()



cloudHubClient, err := clients.GetClient(ehc.config.Protocol, config.GetConfig())



ehc.context = ctx


ehc.chClient = cloudHubClient


return nil


}


  1. 第一行单独获取 edgehub.websocket.url 感觉在前面“初始化 EdgehubConfig“中的 websocket 配置信息初始化部分重复,在此立一个源码问题 flag:

  2. 获取 websocket 配置信息重复

  3. 获取 cloudhub client


cloudHubClient, err := clients.GetClient(ehc.config.Protocol, config.GetConfig())


进入 clients.GetClient(…)定义:


kubeedge/edge/pkg/edgehub/factory.go


//GetClient returns an Adapter object with new web socket


func GetClient(clientType string, config *config.EdgeHubConfig) (Adapter, error) {


switch clientType {


case ClientTypeWebSocket:


websocketConf := wsclient.WebSocketConfig{



}


return wsclient.NewWebSocketClient(&websocketConf), nil


case ClientTypeQuic:


quicConfig := quicclient.QuicConfig{



}


return quicclient.NewQuicClient(&quicConfig), nil


default:


klog.Errorf(“Client type: %s is not supported”, clientType)


}


return nil, ErrorWrongClientType


}


从 GetClient(…)函数定义可以知道,该函数定义了 ClientTypeWebSocket、ClientTypeQuic 两种 client 类型,两者都实现了 Adapter interface,下面遇到 Adapter 类型的 client 变量时,记得对应此处的 ClientTypeWebSocket、ClientTypeQuic。


cloud client 初始化


err = ehc.chClient.Init()


ehc.chClient.Init()函数对应”获取 cloudhub client“中 ClientTypeWebSocket、ClientTypeQuic 的 Init()方法,想要了解 Init()具体做了哪些事情,感兴趣的同学可以在本文的基础上自行剖析。


1. 向 edgecore 各模块广播已经连接成功的消息


// execute hook func after connect


ehc.pubConnectInfo(true)


2. 将从 cloud 部分收到的消息转发给指定 edge 部分的指定模块


go ehc.routeToEdge()


3. 将从 edge 部分的消息转发给 cloud 部分


go ehc.routeToCloud()


4. 向 cloud 部分发送心跳信息


go ehc.keepalive()


5. 剩下的步骤都是在 edgehub 模块退出时的一些清理操作。


到此 edgecore 组件的 edgehub 模块的剖析就结束了,由于篇幅原因,很多思路比较清楚的就没有展开分析,感兴趣的同学可以在本文的基础上自习剖析。


免费直播课


《KubeEdge 技术详解与实战》


今晚 8:00 开播


第 5 课:KubeEdge EdgeMesh 设计原理


直播链接


https://huaweicloud.bugu.mudu.tv/watch/rm2jzlo5


作者 | 之江实验室端边云操作系统团队


原文链接:http://suo.im/6aIHSZ


2019-12-17 14:311940

评论

发布
暂无评论
发现更多内容

环信和阿里云签署云原生合作,携手共建云通讯“新基建”

DT极客

揭示智能边缘重大机遇 英特尔邀产学研推动产业智能升级

E科讯

2020大厂面试一道高频Spring题,90%的Java开发者都拜倒在它脚下!

Java架构师迁哥

新疆采风笔记:送行·出发·火车上

刘新吾

随笔 旅行 新疆

华为云推UGO:一手抓结构迁移,一手抓SQL转换

华为云开发者联盟

golang 表格编程降低圈复杂度

猴子胖胖

表格开发 Go 语言

技术译文|如何将 Pulsar 用作消息队列

Apache Pulsar

开源 云原生 pulsar Apache Pulsar 消息中间件

你一定看得懂的Netty客户端启动源码分析!

Java 编程 Netty 架构师

整合Elastic-Job(支持动态任务)

TaurusCode

springboot SpringCloud 分布式任务调度 Elastic-job

恶补,一文了解 8 种常见的数据结构

Java架构师迁哥

支付平台架构技术实现之终端安全

博文视点Broadview

架构 安全攻防 安全 支付系统 风控

2020年程序员必备的面试重点+面试真题+个人软实力,你学废了吗?

Java架构师迁哥

腾讯架构师:亲手Debug之后,你就知道为何面试问源码了

小Q

Java tomcat 程序员 架构 调优

“大数据+区块链”的智慧城市建设!

CECBC

区块链 大数据

年度开源盛会 ApacheCon 来临,Apache Pulsar 专场大咖齐聚

Apache Pulsar

开源 云原生 Apache Pulsar 消息中间件

海量数据拉升背后的成本困扰:存算分离成美图降本增效新良方

华为云开发者联盟

大数据 华为云 海量数据

从全备中恢复单库或单表,小心有坑!

Simon

MySQL MySQL 运维

分布式系统实践解读丨详解高内聚低耦合

华为云开发者联盟

阿里P8大牛的建议,工作1-5年的Java工程师如何让自己变得更值钱

Java架构之路

Java 编程 程序员 面试

(2)skynet ubuntu下载与安装

休比

anyRTC云端录制功能上线

anyRTC开发者

WebRTC 语音 直播 RTC 安卓

智谱AI首席科学家唐杰团队荣获国际数据挖掘顶会时间检验应用科学奖

DT极客

一个线程池中的线程异常了,那么线程池会怎么处理这个线程?

Java架构师迁哥

时空碰撞优化系列·一

誓约·追光者

hive 数据分析 Sparksql 计算效率 优化

Java引入第三方包的路径问题

谷鱼

路径

flutter 高效开发工具集

Daniel

nginx 实现接口版本控制

程序员与厨子

php nginx laravel 版本控制

区块链用于支付手段只是开端

CECBC

区块链 金融

Java 回调(Callback)接口学习使用

魏杰

华为云IoT智简联接,开启物联世界新纪元

华为云开发者联盟

物联网

深度解析物联网设备的区块链技术

CECBC

区块链 智能合约 物联网

KubeEdge源码分析之(四)edgehub_云原生_华为云原生团队_InfoQ精选文章