50万奖金+官方证书,深圳国际金融科技大赛正式启动,点击报名 了解详情
写点什么

KubeEdge 源码分析之(六)metamanager

  • 2020-01-03
  • 本文字数:3874 字

    阅读完需:约 13 分钟

KubeEdge源码分析之(六)metamanager

本系列的源码分析是在 commit da92692baa660359bb314d89dfa3a80bffb1d26c 之上进行的。

cloudcore 部分的源码分析是在 kubeedge 源码分析系列之整体架构基础上展开的,如果没有阅读过 kubeedge 源码分析系列之整体架构,直接阅读本文,会感觉比较突兀。


本文对 edgecore 的 metamanager 模块进行剖析,metamanager 作为 edgecore 中的 edged 模块与 edgehub 模块进行交互的桥梁,除了将 edgehub 的消息转发给 edged,还对一些必要的数据通过 SQLite 进行了缓存,在某种程度上实现了 kubeedge 的 offline mode。本文就对 metamanager 所涉及的 SQLite 数据库相关逻辑和业务逻辑进行剖析:


1、metamanager 数据库相关逻辑剖析


2、metamanager 业务逻辑剖析

metamanager 数据库相关逻辑剖析

从 eventbus 的模块注册函数入手:


kubeedge/edge/pkg/eventbus/event_bus.go


//constant metamanager module name const (                        MetaManagerModuleName = "metaManager" ) ... // Register register metamanager func Register() {                          dbm.RegisterModel(MetaManagerModuleName, new(dao.Meta))               core.Register(&metaManager{}) }
复制代码


在注册函数 Register()中,做了 2 件事:


  1. 在 SQLite 中的数据库中初始化 metaManager 表


dbm.RegisterModel(MetaManagerModuleName, new(dao.Meta))
复制代码


  1. 注册已经初始化的 metamanager


core.Register(&metaManager{})
复制代码


下面深入剖析 “1.在 SQLite 中的数据库中初始化 metaManager 表” 相关内容,进入 dbm.RegisterModel(…):


kubeedge/edge/pkg/common/dbm/db.go


//RegisterModel registers the defined model in the orm if model is enabled func RegisterModel(moduleName string, m interface{}) {               if isModuleEnabled(moduleName) {                        orm.RegisterModel(m)                        ...              } else {                       ...            } }

复制代码


RegisterModel(…)函数是对 github.com/astaxie/bee…的封装,本文就不跟进去剖析了,感兴趣的同学可以在本文的基础上自行剖析。


回到“1.在 SQLite 中的数据库中初始化 metaManager 表”,深入剖析 metaManager 表的具体定义 dao.Meta,进入 dao.Meta 定义:


kubeedge/edge/pkg/metamanager/dao/meta.go


// Meta metadata object type Meta struct {              // ID    int64  `orm:"pk; auto; column(id)"`            Key   string `orm:"column(key); size(256); pk"`             Type  string `orm:"column(type); size(32)"`             Value string `orm:"column(value); null; type(text)"`}
复制代码


metaManager 表的具体定义包含 Key、Type 和 Value 三个字段,具体含义如下:


  • Key meta 的名字;

  • Type meta 对应的操作类型;

  • Value 具体的 meta 值;


与 Meta Struct 的定义在同一文件内,还有对 metaManager 表的一些操作定义,如 SaveMeta、DeleteMetaByKey、UpdateMeta、InsertOrUpdate、UpdateMetaField、UpdateMetaFields、QueryMeta、QueryAllMeta,本文不对具体操作的定义进行深入剖析,感兴趣的同学可以在本文的基础上自行剖析。

metamanager 业务逻辑剖析

从 metamanager 的模块启动函数入手:


kubeedge/edge/pkg/metamanager/module.go


func (m *metaManager) Start(c *context.Context) {              m.context = c              InitMetaManagerConfig()              go func() {                           period := getSyncInterval()                           timer := time.NewTimer(period)                          for {                                      select {                                      case <-timer.C:                                                  timer.Reset(period)                                                   msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)                                                  m.context.Send(MetaManagerModuleName, *msg)                                     }                        }            }()            m.mainLoop() }
复制代码


启动函数 Start(…)做了如下 4 件事:


  1. 接收并保存模块启动时传入的*context.Context 实例


m.context = c
复制代码


  1. 初始化 metamanager 配置


InitMetaManagerConfig()
复制代码


  1. 启动一个 goroutine 同步心跳信息


go func() {...}
复制代码


  1. 启动一个循环处理各种事件


m.mainLoop()
复制代码


接下来展开分析 2、3、4。


初始化 metamanager 配置


进入 InitMetaManagerConfig()定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


// InitMetaManagerConfig init meta config func InitMetaManagerConfig() {                    var err error                    groupName, err := config.CONFIG.GetValue("metamanager.context-send-group").ToString()  ...                  edgeSite, err := config.CONFIG.GetValue("metamanager.edgesite").ToBool() ...                  moduleName, err := config.CONFIG.GetValue("metamanager.context-send-module").ToString()... }

复制代码


在初始化 metamanager 配置时,从配置文件中获取了 metamanager.context-send-group、metamanager.edgesite、metamanager.context-send-module,根据获取的值对相关变量进行设置。


启动一个 goroutine 同步心跳信息


kubeedge/edge/pkg/metamanager/module.go


go func() {              period := getSyncInterval()               timer := time.NewTimer(period)               for {                           select {                           case <-timer.C:                                          timer.Reset(period)                                          msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)                                          m.context.Send(MetaManagerModuleName, *msg)                         }                    }           }
复制代码


在同步心跳信息的 goroutine 中,做了如下 2 件事:


  1. 获取通信心跳的时间间隔


period := getSyncInterval()
复制代码


  1. 创建定时器,并定时发送心跳信息


timer := time.NewTimer(period) for {...}
复制代码


启动一个循环处理各种事件


进入 m.mainLoop()定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


func (m *metaManager) mainLoop() {               go func() {                            for {                                                 if msg, err := m.context.Receive(m.Name()); err == nil {                                                 ...                                                m.process(msg)                            } else {                                                 ...                            }             }             }() }

复制代码


mainLoop()函数启动了一个 for 循环,在循环中主要做了 2 件事:


  1. 接收信息


msg, err := m.context.Receive(m.Name())
复制代码


  1. 对接收到的信息进行处理


m.process(msg)
复制代码


想弄明白对信息的处理过程,需要进入 m.process(…)的定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


func (m *metaManager) process(message model.Message) {                    operation := message.GetOperation()              switch operation {              case model.InsertOperation:                                m.processInsert(message)              case model.UpdateOperation:                               m.processUpdate(message)            case model.DeleteOperation:                                m.processDelete(message)            case model.QueryOperation:                                m.processQuery(message)             case model.ResponseOperation:                              m.processResponse(message)             case messagepkg.OperationNodeConnection:                               m.processNodeConnection(message)            case OperationMetaSync:                               m.processSync(message)            case OperationFunctionAction:                               m.processFunctionAction(message)            case OperationFunctionActionResult:                               m.processFunctionActionResult(message)             case constants.CSIOperationTypeCreateVolume,                     constants.CSIOperationTypeDeleteVolume,                    constants.CSIOperationTypeControllerPublishVolume,                    constants.CSIOperationTypeControllerUnpublishVolume:                    m.processVolume(message)}}
复制代码


process(…)函数中主要做了如下 2 件事:


  1. 获取消息的操作的类型


operation := message.GetOperation()
复制代码


  1. 根据信息操作类型对信息进行相应处理


switch operation { ... }
复制代码


信息的操作类型包括 insert、update、delete、query、response、publish、meta-internal-sync、action、action_result 等,本文不对信息的具体处理过程剖析,感兴趣的同学可以在本文的基础上自行剖析。


2020-01-03 10:481850

评论

发布
暂无评论
发现更多内容

一文带你直观感受,BPM管理系统如何在低代码平台实现搭建

加入高科技仿生人

低代码 数字化 系统开发 BPM

酷家乐x极盾科技:“智能安全决策平台”助力日均十亿级日志分析

极盾科技

数据安全

今日分享丨inBuilder低代码平台有关前端的“道、法、术、器”

inBuilder低代码平台

前端 低代码平台

【干货集】PCBA板边器件布局重要性

华秋PCB

工具 电路 PCB 布局 PCB设计

流行的DJ音乐混音软件:X Djing - Music Mix Maker for Mac

真大的脸盆

Mac Mac 软件 音乐混音软件

华为云大数据治理轻量级解决方案为中长尾企业赋能

轶天下事

业务出海,华为云全球加速服务GA助一臂之力

轶天下事

案例实践|云智慧ITSM产品在利星行汽车的运维实践

云智慧AIOps社区

运维 ITSM ITSM软件 ITSM解决方案 IT 运维

Alibaba开发十年,写出这本“MQ技术手册”,看完我愣住了

程序知音

Java RocketMQ 消息中间件 Java进阶 后端技术

开启数字化,传统工厂该如何布局?

优秀

数字化 数字工厂

重塑财务计划,拥抱全面预算管理的未来

智达方通

业财融合 全面预算管理 财务计划

阿里云 EMAS & 魔笔:4月产品动态

移动研发平台EMAS

阿里云 DevOps 消息推送 低代码平台 兼容性测试

神奇,声网Web SDK还能这么实现直播中美颜功能

声网

前端 Web RTC 美颜

开源工具系列7:Kube-bench

HummerCloud

Kubernetes 云原生安全

软件测试 | 编程语言中的Interface

测吧(北京)科技有限公司

测试

Django认证系统

测吧(北京)科技有限公司

测试

看火山引擎DataLeap如何做好电商治理(一):挑战与痛点

字节跳动数据平台

监控 模型 电商 数据平台 DataLeap

企业办公转型的出路在哪里?华为云桌面开创办公新形式

轶天下事

JAVA快速开发框架 一键生成表单模板代码

力软低代码开发平台

数据库运维实操优质文章分享(含Oracle、MySQL等) | 2023年4月刊

墨天轮

MySQL 数据库 oracle postgresql 国产数据库

ChatGPT会如何影响我们,会让我们失业吗?兼与吴军博士商榷 | 社区征文

李韧

人工智能 ChatGPT 三周年征文

iOS MachineLearning 系列(13)—— 语音与音频相关的AI能力

珲少

接口测试

测吧(北京)科技有限公司

测试

Alibaba技术官熬夜肝出的,Kafka“限量笔记”牛掰!

程序知音

Java kafka java架构 Java进阶 后端技术

mac软件卸载不干净怎么办?

真大的脸盆

Mac Mac 软件 软件卸载工具 卸载软件

10年IT老兵亲述SpringCloud开发从入门到实战文档

程序知音

Java 微服务 java架构 Java进阶 spring-cloud

安卓机上 4G 内存跑 alpaca,欢迎试用轻量级 LLM 模型推理框架 InferLLM

MegEngineBot

开源 大模型 MegEngine LLM

联合索引该如何选择合适的列?

江南一点雨

MySQL

数字化管理时代来临,瓴羊Quick BI、帆软Fine BI领跑国产BI市场

对不起该用户已成仙‖

JMeter实时性能监控平台实战

测吧(北京)科技有限公司

测试

KubeEdge源码分析之(六)metamanager_架构_华为云原生团队_InfoQ精选文章