写点什么

KubeEdge 源码分析之(六)metamanager

  • 2020-01-03
  • 本文字数:3874 字

    阅读完需:约 13 分钟

KubeEdge源码分析之(六)metamanager

本系列的源码分析是在 commit da92692baa660359bb314d89dfa3a80bffb1d26c 之上进行的。

cloudcore 部分的源码分析是在 kubeedge 源码分析系列之整体架构基础上展开的,如果没有阅读过 kubeedge 源码分析系列之整体架构,直接阅读本文,会感觉比较突兀。


本文对 edgecore 的 metamanager 模块进行剖析,metamanager 作为 edgecore 中的 edged 模块与 edgehub 模块进行交互的桥梁,除了将 edgehub 的消息转发给 edged,还对一些必要的数据通过 SQLite 进行了缓存,在某种程度上实现了 kubeedge 的 offline mode。本文就对 metamanager 所涉及的 SQLite 数据库相关逻辑和业务逻辑进行剖析:


1、metamanager 数据库相关逻辑剖析


2、metamanager 业务逻辑剖析

metamanager 数据库相关逻辑剖析

从 eventbus 的模块注册函数入手:


kubeedge/edge/pkg/eventbus/event_bus.go


//constant metamanager module name const (                        MetaManagerModuleName = "metaManager" ) ... // Register register metamanager func Register() {                          dbm.RegisterModel(MetaManagerModuleName, new(dao.Meta))               core.Register(&metaManager{}) }
复制代码


在注册函数 Register()中,做了 2 件事:


  1. 在 SQLite 中的数据库中初始化 metaManager 表


dbm.RegisterModel(MetaManagerModuleName, new(dao.Meta))
复制代码


  1. 注册已经初始化的 metamanager


core.Register(&metaManager{})
复制代码


下面深入剖析 “1.在 SQLite 中的数据库中初始化 metaManager 表” 相关内容,进入 dbm.RegisterModel(…):


kubeedge/edge/pkg/common/dbm/db.go


//RegisterModel registers the defined model in the orm if model is enabled func RegisterModel(moduleName string, m interface{}) {               if isModuleEnabled(moduleName) {                        orm.RegisterModel(m)                        ...              } else {                       ...            } }

复制代码


RegisterModel(…)函数是对 github.com/astaxie/bee…的封装,本文就不跟进去剖析了,感兴趣的同学可以在本文的基础上自行剖析。


回到“1.在 SQLite 中的数据库中初始化 metaManager 表”,深入剖析 metaManager 表的具体定义 dao.Meta,进入 dao.Meta 定义:


kubeedge/edge/pkg/metamanager/dao/meta.go


// Meta metadata object type Meta struct {              // ID    int64  `orm:"pk; auto; column(id)"`            Key   string `orm:"column(key); size(256); pk"`             Type  string `orm:"column(type); size(32)"`             Value string `orm:"column(value); null; type(text)"`}
复制代码


metaManager 表的具体定义包含 Key、Type 和 Value 三个字段,具体含义如下:


  • Key meta 的名字;

  • Type meta 对应的操作类型;

  • Value 具体的 meta 值;


与 Meta Struct 的定义在同一文件内,还有对 metaManager 表的一些操作定义,如 SaveMeta、DeleteMetaByKey、UpdateMeta、InsertOrUpdate、UpdateMetaField、UpdateMetaFields、QueryMeta、QueryAllMeta,本文不对具体操作的定义进行深入剖析,感兴趣的同学可以在本文的基础上自行剖析。

metamanager 业务逻辑剖析

从 metamanager 的模块启动函数入手:


kubeedge/edge/pkg/metamanager/module.go


func (m *metaManager) Start(c *context.Context) {              m.context = c              InitMetaManagerConfig()              go func() {                           period := getSyncInterval()                           timer := time.NewTimer(period)                          for {                                      select {                                      case <-timer.C:                                                  timer.Reset(period)                                                   msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)                                                  m.context.Send(MetaManagerModuleName, *msg)                                     }                        }            }()            m.mainLoop() }
复制代码


启动函数 Start(…)做了如下 4 件事:


  1. 接收并保存模块启动时传入的*context.Context 实例


m.context = c
复制代码


  1. 初始化 metamanager 配置


InitMetaManagerConfig()
复制代码


  1. 启动一个 goroutine 同步心跳信息


go func() {...}
复制代码


  1. 启动一个循环处理各种事件


m.mainLoop()
复制代码


接下来展开分析 2、3、4。


初始化 metamanager 配置


进入 InitMetaManagerConfig()定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


// InitMetaManagerConfig init meta config func InitMetaManagerConfig() {                    var err error                    groupName, err := config.CONFIG.GetValue("metamanager.context-send-group").ToString()  ...                  edgeSite, err := config.CONFIG.GetValue("metamanager.edgesite").ToBool() ...                  moduleName, err := config.CONFIG.GetValue("metamanager.context-send-module").ToString()... }

复制代码


在初始化 metamanager 配置时,从配置文件中获取了 metamanager.context-send-group、metamanager.edgesite、metamanager.context-send-module,根据获取的值对相关变量进行设置。


启动一个 goroutine 同步心跳信息


kubeedge/edge/pkg/metamanager/module.go


go func() {              period := getSyncInterval()               timer := time.NewTimer(period)               for {                           select {                           case <-timer.C:                                          timer.Reset(period)                                          msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)                                          m.context.Send(MetaManagerModuleName, *msg)                         }                    }           }
复制代码


在同步心跳信息的 goroutine 中,做了如下 2 件事:


  1. 获取通信心跳的时间间隔


period := getSyncInterval()
复制代码


  1. 创建定时器,并定时发送心跳信息


timer := time.NewTimer(period) for {...}
复制代码


启动一个循环处理各种事件


进入 m.mainLoop()定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


func (m *metaManager) mainLoop() {               go func() {                            for {                                                 if msg, err := m.context.Receive(m.Name()); err == nil {                                                 ...                                                m.process(msg)                            } else {                                                 ...                            }             }             }() }

复制代码


mainLoop()函数启动了一个 for 循环,在循环中主要做了 2 件事:


  1. 接收信息


msg, err := m.context.Receive(m.Name())
复制代码


  1. 对接收到的信息进行处理


m.process(msg)
复制代码


想弄明白对信息的处理过程,需要进入 m.process(…)的定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


func (m *metaManager) process(message model.Message) {                    operation := message.GetOperation()              switch operation {              case model.InsertOperation:                                m.processInsert(message)              case model.UpdateOperation:                               m.processUpdate(message)            case model.DeleteOperation:                                m.processDelete(message)            case model.QueryOperation:                                m.processQuery(message)             case model.ResponseOperation:                              m.processResponse(message)             case messagepkg.OperationNodeConnection:                               m.processNodeConnection(message)            case OperationMetaSync:                               m.processSync(message)            case OperationFunctionAction:                               m.processFunctionAction(message)            case OperationFunctionActionResult:                               m.processFunctionActionResult(message)             case constants.CSIOperationTypeCreateVolume,                     constants.CSIOperationTypeDeleteVolume,                    constants.CSIOperationTypeControllerPublishVolume,                    constants.CSIOperationTypeControllerUnpublishVolume:                    m.processVolume(message)}}
复制代码


process(…)函数中主要做了如下 2 件事:


  1. 获取消息的操作的类型


operation := message.GetOperation()
复制代码


  1. 根据信息操作类型对信息进行相应处理


switch operation { ... }
复制代码


信息的操作类型包括 insert、update、delete、query、response、publish、meta-internal-sync、action、action_result 等,本文不对信息的具体处理过程剖析,感兴趣的同学可以在本文的基础上自行剖析。


2020-01-03 10:481854

评论

发布
暂无评论
发现更多内容

EMQ&南洋万邦云边一体化方案:激活数据潜力,打造智慧工业园区

EMQ映云科技

物联网 IoT 工业互联网 智能制造 企业号 3 月 PK 榜

Go Slice 扩容的这些坑你踩过吗?

王中阳Go

Go golang 高效工作 学习方法 面试题

代码质量与安全 | 免费的静态分析工具好吗?

龙智—DevSecOps解决方案

SAST 静态代码扫描 DAST

代码实战带你了解深度学习中的混合精度训练

华为云开发者联盟

人工智能 深度学习 华为云 华为云开发者联盟 企业号 3 月 PK 榜

GPT-4:不open的OpenAI,终于不再编造事实

鼎道智联

openai ChatGPT4

AI笔刷怎样导入?adobe ai笔刷安装教程

Rose

AI画笔 AI教程 Illustrator 2023 下载 AI中文版

如何基于 Apache Doris 与 Apache Flink 快速构建极速易用的实时数仓

SelectDB

flink 数据湖 实时数仓 Doris 数据库、

见技术大佬,领惊喜好礼!快来领取数据库峰会邀请函!

InfoQ写作社区官方

数据库 云原生 阿里 热门活动 阿里云瑶池数据库峰会

TechBits | TCP 使用 WireShark 进行抓包

Java你猿哥

Java 后端 ssm

阿里巴巴灵魂一问:说说触发HashMap死循环根因

Java你猿哥

Java jdk 后端 ssm

深入理解关键字volatile

小小怪下士

Java 程序员 volatile 关键字

Atlassian Server用户新选择 | 迁移到数据中心版前,您需要做这些准备(1)

龙智—DevSecOps解决方案

Atlassian Atlassian迁移 数据中心版 server版

StyleGAN 生成 AI 虚拟人脸,再也不怕侵犯肖像权

江户川码农

人工智能 AI 图像处理 StyleGAN 人脸生成

简单小巧的右键助手:MouseBoost for Mac让您的工作效率大幅度提高

Rose

mac效率工具 右键助手 MouseBoost激活版

CorelDRAW Graphics Suite2023功能介绍

茶色酒

cdr2023

如何利用ChatGPT搞科研?

Openlab_cosmoplat

人工智能 开源社区 ChatGPT

币安欧意交易所合约跟单平台软件开发详情(api对接)

开发微hkkf5566

Apache Doris 1.2.3 Release 版本正式发布

SelectDB

数据仓库 数据湖 Doris 数据湖Catalog catalog

中小企业运维安全审计用什么软件好?有推荐吗?

行云管家

信息安全 堡垒机 运维审计

Perforce研讨会回顾 | Helix Core在芯片行业的应用实例:芯片项目的版本控制、持续集成及自动化

龙智—DevSecOps解决方案

ci cicd 版本控制 持续集成 芯片开发

Neural Filters神经滤镜插件如何安装?PS神经滤镜插件安装教程

Rose

mac系统 Neural Filters PS滤镜插件 PS20221下载

MQTT 5.0特性Inflight Window&Message Queue

EMQ映云科技

物联网 IoT mqtt emqx 企业号 3 月 PK 榜

京东三面:说说synchronized和volatile的区别

Java你猿哥

Java 面试 ssm 面经 Java多线程

Portraiture最新版插件新增哪些功能?

茶色酒

Portraiture4

硬核!腾讯大佬最新手打的Spring Boot笔记,从原理到实战再到源码

Java你猿哥

Java Spring Boot 后端 面经

美团二面特点:喜欢写一个 SQL 语句,然后问你加了哪些锁!

Java你猿哥

Java 数据库 sql ssm InnoDB存储引擎

GO语言集成开发: GoLand 2022 中文激活版

真大的脸盆

Mac 代码开发 Mac 软件 代码编辑 代码编辑工具

Java体系最强干货分享—挑战40天准备Java面试,最快拿到offer!

Java你猿哥

Java 后端 ssm 面经 春招

【IT小知识】扩容是什么意思?扩容的近义词有哪些?

行云管家

扩容 IT运维

KubeEdge源码分析之(六)metamanager_架构_华为云原生团队_InfoQ精选文章