前言
本系列的源码分析是在 commit da92692baa660359bb314d89dfa3a80bffb1d26c 之上进行的。
cloudcore 部分的源码分析是在 kubeedge 源码分析系列之整体架构基础上展开的,如果没有阅读过 kubeedge 源码分析系列之整体架构,直接阅读本文,会感觉比较突兀。
本文对 edgecore 的 edgehub 模块进行剖析,edgehub 作为 edge 部分与 cloud 部分进行交互的门户,有必要将 edgehub 相关内容彻底分析清楚,为使用过程中的故障排查和未来的功能扩展与性能优化都会有很大的帮助。对 edgehub 的剖析,具体包括如下几部分:
edgehub 的 struct 调用链剖析
edgehub 的具体逻辑剖析
edgehub 的 struct 组成剖析
从 edgecore 模块注册函数入手:
kubeedge/edge/cmd/edgecore/app/server.go
// registerModules register all the modules started in edgecore
func registerModules() {
devicetwin.Register()
…
}
进入 registerModules()函数中的 devicetwin.Register(),具体如下:
kubeedge/edge/pkg/devicetwin/devicetwin.go
// Register register edgehub
func Register() {
core.Register(&EdgeHub{
controller: NewEdgeHubController(),
})
}
顺着 Register()函数中 EdgeHub struct 的实例化语句进入 EdgeHub struct 定义:
kubeedge/edge/pkg/edgehub/module.go
//EdgeHub defines edgehub object structure
type EdgeHub struct {
context *context.Context
controller *Controller
}
EdgeHub struct 中包含*context.Context 和*Controller 两个属性:
*context.Context 在 kubeedge 源码分析系列之 cloudcore 中,笔者已经分析过*context.Context,它是一个基于 go-channels 的消息框架,edgecore 用它作为各功能模块之间通信的消息管道。
Controller edgehub 的主要功能载体;
进入 Controller struct 的定义:
kubeedge/edge/pkg/edgehub/controller.go
//Controller is EdgeHub controller object
type Controller struct {
context *context.Context
chClient clients.Adapter
config *config.ControllerConfig
stopChan chan struct{}
syncKeeper map[string]chan model.Message
keeperLock sync.RWMutex
}
从 Controller struct 的定义可以确定,Controller struct 是 edgehub 核心功能载体没错了。
到此,edgehub 的 struct 组成剖析就结束了,接下来剖析 edgehub 的具体逻辑。
edgehub 的具体逻辑剖析
回到 edgehub 的注册函数,开始剖析 edgehub 相关的逻辑:
kubeedge/edge/pkg/edgehub/module.go
// Register register edgehub
func Register() {
core.Register(&EdgeHub{
controller: NewEdgeHubController(),
})
}
在 Register()函数中对 EdgeHub struct 的初始化只是对 EdgeHub struct 中的 controller 进行了初始化,进入 controller 的初始化函数:
kubeedge/edge/pkg/edgehub/controller.go
//NewEdgeHubController creates and returns a EdgeHubController object
func NewEdgeHubController() *Controller {
return &Controller{
config: &config.GetConfig().CtrConfig,
stopChan: make(chan struct{}),
syncKeeper: make(map[string]chan model.Message),
}
}
NewEdgeHubController()函数中嵌套了一个获取配置信息的函数调用:
config: &config.GetConfig().CtrConfig
进入 config.GetConfig()函数定义:
kubeedge/edge/pkg/edgehub/config/config.go
var edgeHubConfig EdgeHubConfig
…
//GetConfig returns the EdgeHub configuration object
func GetConfig() *EdgeHubConfig {
return &edgeHubConfig
}
GetConfig()函数只返回了 &edgeHubConfig,而 edgeHubConfig 是一个 EdgeHubConfig struct 类型的全局变量,至于该变量是在哪里被赋值,怎么赋值的,暂且立个疑问 flag:
EdgeHubConfig 赋值疑问
到此,EdgeHub struct 的初始化就告一段落了,下面分析 EdgeHub 的启动逻辑:
kubeedge/edge/pkg/edgehub/module.go
//Start sets context and starts the controller
func (eh *EdgeHub) Start(c *context.Context) {
eh.context = c
eh.controller.Start©
}
EdgeHub 启动函数 Start(…)只做了 2 件事:
1. 接收并存储传入的消息管道
eh.context = c
2. 启动 EdgeHub 的 controller
eh.controller.Start©
由前面的分析可知 EdgeHub 的 controller 作为 EdgeHub 功能的主要载体,其启动函数也必将囊括 EdgeHub 绝大部分启动逻辑,继续进入 controller 的启动函数定义:
kubeedge/edge/pkg/edgehub/controller.go
//Start will start EdgeHub
func (ehc *Controller) Start(ctx *context.Context) {
config.InitEdgehubConfig()
for {
err := ehc.initial(ctx)
…
err = ehc.chClient.Init()
…
// execute hook func after connect
ehc.pubConnectInfo(true)
go ehc.routeToEdge()
go ehc.routeToCloud()
go ehc.keepalive()
// wait the stop singal
// stop authinfo manager/websocket connection
<-ehc.stopChan
ehc.chClient.Uninit()
// execute hook fun after disconnect
ehc.pubConnectInfo(false)
// sleep one period of heartbeat, then try to connect cloud hub again
time.Sleep(ehc.config.HeartbeatPeriod * 2)
// clean channel
clean:
for {
select {
case <-ehc.stopChan:
default:
break clean
}
}
}
}
从 Controller 的启动函数 Start(…)的定义,可以清楚地看到,其中包含了 EdgeHub 的初始化、各种业务 go routine 的启动和最后的退出清理,下面逐个深入剖析:
初始化 EdgehubConfig
config.InitEdgehubConfig()
进入 config.InitEdgehubConfig()函数定义:
kubeedge/edge/pkg/edgehub/config/config.go
// InitEdgehubConfig init edgehub config
func InitEdgehubConfig() {
err := getControllerConfig()
…
if edgeHubConfig.CtrConfig.Protocol == protocolWebsocket {
err = getWebSocketConfig()
…
} else if edgeHubConfig.CtrConfig.Protocol == protocolQuic {
err = getQuicConfig()
…
} else {
…
}
}
InitEdgehubConfig()函数首先通过 err := getControllerConfig()获得 EdgeHub Controller 的配置信息,然后通过获得的配置信息中的 Protocol 字段来判断是哪个协议,最后根据判断结果获取相应的协议绑定的配置信息或报错。
针对以上获取配置的操作,重点分析获得 EdgeHub Controller 的配置信息,进入 getControllerConfig():
kubeedge/edge/pkg/edgehub/config/config.go
var edgeHubConfig EdgeHubConfig
…
func getControllerConfig() error {
protocol, err := config.CONFIG.GetValue(“edgehub.controller.protocol”).ToString()
…
edgeHubConfig.CtrConfig.Protocol = protocol
heartbeat, err := config.CONFIG.GetValue(“edgehub.controller.heartbeat”).ToInt()
…
edgeHubConfig.CtrConfig.HeartbeatPeriod = time.Duration(heartbeat) * time.Second
projectID, err := config.CONFIG.GetValue(“edgehub.controller.project-id”).ToString()
…
edgeHubConfig.CtrConfig.ProjectID = projectID
nodeID, err := config.CONFIG.GetValue(“edgehub.controller.node-id”).ToString()
…
edgeHubConfig.CtrConfig.NodeID = nodeID
return nil
}
getControllerConfig()获取 edgehub.controller.*相关的配置信息并赋值给变量 edgeHubConfig,到此前面立的“EdgeHubConfig 赋值疑问”flag 也得到了解答。
EdgeHub Controller 初始化
err := ehc.initial(ctx)
进入 ehc.initial(…)函数定义:
kubeedge/edge/pkg/edgehub/controller.go
func (ehc *Controller) initial(ctx *context.Context) (err error) {
config.GetConfig().WSConfig.URL, err = bhconfig.CONFIG.GetValue(“edgehub.websocket.url”).ToString()
…
cloudHubClient, err := clients.GetClient(ehc.config.Protocol, config.GetConfig())
…
ehc.context = ctx
ehc.chClient = cloudHubClient
return nil
}
第一行单独获取 edgehub.websocket.url 感觉在前面“初始化 EdgehubConfig“中的 websocket 配置信息初始化部分重复,在此立一个源码问题 flag:
获取 websocket 配置信息重复
获取 cloudhub client
cloudHubClient, err := clients.GetClient(ehc.config.Protocol, config.GetConfig())
进入 clients.GetClient(…)定义:
kubeedge/edge/pkg/edgehub/factory.go
//GetClient returns an Adapter object with new web socket
func GetClient(clientType string, config *config.EdgeHubConfig) (Adapter, error) {
switch clientType {
case ClientTypeWebSocket:
websocketConf := wsclient.WebSocketConfig{
…
}
return wsclient.NewWebSocketClient(&websocketConf), nil
case ClientTypeQuic:
quicConfig := quicclient.QuicConfig{
…
}
return quicclient.NewQuicClient(&quicConfig), nil
default:
klog.Errorf(“Client type: %s is not supported”, clientType)
}
return nil, ErrorWrongClientType
}
从 GetClient(…)函数定义可以知道,该函数定义了 ClientTypeWebSocket、ClientTypeQuic 两种 client 类型,两者都实现了 Adapter interface,下面遇到 Adapter 类型的 client 变量时,记得对应此处的 ClientTypeWebSocket、ClientTypeQuic。
cloud client 初始化
err = ehc.chClient.Init()
ehc.chClient.Init()函数对应”获取 cloudhub client“中 ClientTypeWebSocket、ClientTypeQuic 的 Init()方法,想要了解 Init()具体做了哪些事情,感兴趣的同学可以在本文的基础上自行剖析。
1. 向 edgecore 各模块广播已经连接成功的消息
// execute hook func after connect
ehc.pubConnectInfo(true)
2. 将从 cloud 部分收到的消息转发给指定 edge 部分的指定模块
go ehc.routeToEdge()
3. 将从 edge 部分的消息转发给 cloud 部分
go ehc.routeToCloud()
4. 向 cloud 部分发送心跳信息
go ehc.keepalive()
5. 剩下的步骤都是在 edgehub 模块退出时的一些清理操作。
到此 edgecore 组件的 edgehub 模块的剖析就结束了,由于篇幅原因,很多思路比较清楚的就没有展开分析,感兴趣的同学可以在本文的基础上自习剖析。
免费直播课
《KubeEdge 技术详解与实战》
今晚 8:00 开播
第 5 课:KubeEdge EdgeMesh 设计原理
直播链接
https://huaweicloud.bugu.mudu.tv/watch/rm2jzlo5
作者 | 之江实验室端边云操作系统团队
原文链接:http://suo.im/6aIHSZ
评论