AICon 北京站 Keynote 亮点揭秘,想了解 Agent 智能体来就对了! 了解详情
写点什么

KubeEdge 源码分析之(六)metamanager

  • 2020-01-03
  • 本文字数:3874 字

    阅读完需:约 13 分钟

KubeEdge源码分析之(六)metamanager

本系列的源码分析是在 commit da92692baa660359bb314d89dfa3a80bffb1d26c 之上进行的。

cloudcore 部分的源码分析是在 kubeedge 源码分析系列之整体架构基础上展开的,如果没有阅读过 kubeedge 源码分析系列之整体架构,直接阅读本文,会感觉比较突兀。


本文对 edgecore 的 metamanager 模块进行剖析,metamanager 作为 edgecore 中的 edged 模块与 edgehub 模块进行交互的桥梁,除了将 edgehub 的消息转发给 edged,还对一些必要的数据通过 SQLite 进行了缓存,在某种程度上实现了 kubeedge 的 offline mode。本文就对 metamanager 所涉及的 SQLite 数据库相关逻辑和业务逻辑进行剖析:


1、metamanager 数据库相关逻辑剖析


2、metamanager 业务逻辑剖析

metamanager 数据库相关逻辑剖析

从 eventbus 的模块注册函数入手:


kubeedge/edge/pkg/eventbus/event_bus.go


//constant metamanager module name const (                        MetaManagerModuleName = "metaManager" ) ... // Register register metamanager func Register() {                          dbm.RegisterModel(MetaManagerModuleName, new(dao.Meta))               core.Register(&metaManager{}) }
复制代码


在注册函数 Register()中,做了 2 件事:


  1. 在 SQLite 中的数据库中初始化 metaManager 表


dbm.RegisterModel(MetaManagerModuleName, new(dao.Meta))
复制代码


  1. 注册已经初始化的 metamanager


core.Register(&metaManager{})
复制代码


下面深入剖析 “1.在 SQLite 中的数据库中初始化 metaManager 表” 相关内容,进入 dbm.RegisterModel(…):


kubeedge/edge/pkg/common/dbm/db.go


//RegisterModel registers the defined model in the orm if model is enabled func RegisterModel(moduleName string, m interface{}) {               if isModuleEnabled(moduleName) {                        orm.RegisterModel(m)                        ...              } else {                       ...            } }

复制代码


RegisterModel(…)函数是对 github.com/astaxie/bee…的封装,本文就不跟进去剖析了,感兴趣的同学可以在本文的基础上自行剖析。


回到“1.在 SQLite 中的数据库中初始化 metaManager 表”,深入剖析 metaManager 表的具体定义 dao.Meta,进入 dao.Meta 定义:


kubeedge/edge/pkg/metamanager/dao/meta.go


// Meta metadata object type Meta struct {              // ID    int64  `orm:"pk; auto; column(id)"`            Key   string `orm:"column(key); size(256); pk"`             Type  string `orm:"column(type); size(32)"`             Value string `orm:"column(value); null; type(text)"`}
复制代码


metaManager 表的具体定义包含 Key、Type 和 Value 三个字段,具体含义如下:


  • Key meta 的名字;

  • Type meta 对应的操作类型;

  • Value 具体的 meta 值;


与 Meta Struct 的定义在同一文件内,还有对 metaManager 表的一些操作定义,如 SaveMeta、DeleteMetaByKey、UpdateMeta、InsertOrUpdate、UpdateMetaField、UpdateMetaFields、QueryMeta、QueryAllMeta,本文不对具体操作的定义进行深入剖析,感兴趣的同学可以在本文的基础上自行剖析。

metamanager 业务逻辑剖析

从 metamanager 的模块启动函数入手:


kubeedge/edge/pkg/metamanager/module.go


func (m *metaManager) Start(c *context.Context) {              m.context = c              InitMetaManagerConfig()              go func() {                           period := getSyncInterval()                           timer := time.NewTimer(period)                          for {                                      select {                                      case <-timer.C:                                                  timer.Reset(period)                                                   msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)                                                  m.context.Send(MetaManagerModuleName, *msg)                                     }                        }            }()            m.mainLoop() }
复制代码


启动函数 Start(…)做了如下 4 件事:


  1. 接收并保存模块启动时传入的*context.Context 实例


m.context = c
复制代码


  1. 初始化 metamanager 配置


InitMetaManagerConfig()
复制代码


  1. 启动一个 goroutine 同步心跳信息


go func() {...}
复制代码


  1. 启动一个循环处理各种事件


m.mainLoop()
复制代码


接下来展开分析 2、3、4。


初始化 metamanager 配置


进入 InitMetaManagerConfig()定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


// InitMetaManagerConfig init meta config func InitMetaManagerConfig() {                    var err error                    groupName, err := config.CONFIG.GetValue("metamanager.context-send-group").ToString()  ...                  edgeSite, err := config.CONFIG.GetValue("metamanager.edgesite").ToBool() ...                  moduleName, err := config.CONFIG.GetValue("metamanager.context-send-module").ToString()... }

复制代码


在初始化 metamanager 配置时,从配置文件中获取了 metamanager.context-send-group、metamanager.edgesite、metamanager.context-send-module,根据获取的值对相关变量进行设置。


启动一个 goroutine 同步心跳信息


kubeedge/edge/pkg/metamanager/module.go


go func() {              period := getSyncInterval()               timer := time.NewTimer(period)               for {                           select {                           case <-timer.C:                                          timer.Reset(period)                                          msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)                                          m.context.Send(MetaManagerModuleName, *msg)                         }                    }           }
复制代码


在同步心跳信息的 goroutine 中,做了如下 2 件事:


  1. 获取通信心跳的时间间隔


period := getSyncInterval()
复制代码


  1. 创建定时器,并定时发送心跳信息


timer := time.NewTimer(period) for {...}
复制代码


启动一个循环处理各种事件


进入 m.mainLoop()定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


func (m *metaManager) mainLoop() {               go func() {                            for {                                                 if msg, err := m.context.Receive(m.Name()); err == nil {                                                 ...                                                m.process(msg)                            } else {                                                 ...                            }             }             }() }

复制代码


mainLoop()函数启动了一个 for 循环,在循环中主要做了 2 件事:


  1. 接收信息


msg, err := m.context.Receive(m.Name())
复制代码


  1. 对接收到的信息进行处理


m.process(msg)
复制代码


想弄明白对信息的处理过程,需要进入 m.process(…)的定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


func (m *metaManager) process(message model.Message) {                    operation := message.GetOperation()              switch operation {              case model.InsertOperation:                                m.processInsert(message)              case model.UpdateOperation:                               m.processUpdate(message)            case model.DeleteOperation:                                m.processDelete(message)            case model.QueryOperation:                                m.processQuery(message)             case model.ResponseOperation:                              m.processResponse(message)             case messagepkg.OperationNodeConnection:                               m.processNodeConnection(message)            case OperationMetaSync:                               m.processSync(message)            case OperationFunctionAction:                               m.processFunctionAction(message)            case OperationFunctionActionResult:                               m.processFunctionActionResult(message)             case constants.CSIOperationTypeCreateVolume,                     constants.CSIOperationTypeDeleteVolume,                    constants.CSIOperationTypeControllerPublishVolume,                    constants.CSIOperationTypeControllerUnpublishVolume:                    m.processVolume(message)}}
复制代码


process(…)函数中主要做了如下 2 件事:


  1. 获取消息的操作的类型


operation := message.GetOperation()
复制代码


  1. 根据信息操作类型对信息进行相应处理


switch operation { ... }
复制代码


信息的操作类型包括 insert、update、delete、query、response、publish、meta-internal-sync、action、action_result 等,本文不对信息的具体处理过程剖析,感兴趣的同学可以在本文的基础上自行剖析。


2020-01-03 10:481777

评论

发布
暂无评论
发现更多内容

YashanDB数据库数据迁移的步骤与注意事项

数据库砖家

YashanDB数据库数据迁移与同步实操教程

数据库砖家

YashanDB数据库数据压缩与存储优化方法

数据库砖家

嵌入式软件算法之PID闭环控制原理

芯动大师

嵌入式 PID 调参

YashanDB数据库数据同步与异地容灾解决方案

数据库砖家

YashanDB数据库数据一致性保障机制详解

数据库砖家

YashanDB数据库未来发展趋势及技术展望

数据库砖家

AI 题库系统的主要功能

北京木奇移动技术有限公司

AI技术开发 软件外包公司 AI题库

最快24小时极速换新家,华为鸿蒙智家后装解决方案亮相HDC

新消费日报

线上会议丨离散元仿真技术(DEM)如何解锁食品制造新‘食’代"?

Altair RapidMiner

AI 数据分析 制造业 EDEM 离散元仿真

YashanDB数据库数据迁移及兼容性解决方案

数据库砖家

白噪音Noizio for Mac,舒缓你的心情,提升工作效率!

Rose

为什么说大家低估了 AI 的实际使用规模?实际情况如何?

Baihai IDP

人工智能 程序员 AI

电商商品详情API接口:解锁电商创新与效率的新引擎

Noah

巨量IP代理“狂欢618 年中大放送”活动火热进行中

巨量HTTP

静态IP 代理IP http代理 socks5代理 隧道代理

鸿蒙星闪,智能生活交响乐的指挥家

脑极体

AI

adobe pr 2025 Mac中文激活补丁及安装教程 兼容M/intel

Rose

switchresx mac 屏幕分辨率修改工具 附switchresx注册安装教程

Rose

Mac 虚拟机工具 VMware Fusion Pro 13中文版 含激活码 附详细安装教程

Rose

Golang基础笔记三之数组和切片

Hunter熊

golang 数组 切片 切片扩容 扩容规律

如何通过CrossOver软件运行Windows游戏?CrossOver安装steam教程

阿拉灯神丁

macos steam CrossOver Mac下载 Mac游戏推荐 雷神加速器

YashanDB数据库数据加密与访问控制实用指南

数据库砖家

YashanDB数据库数据迁移实用方案及操作步骤

数据库砖家

Music Tag Editor Mac 批量的处理音乐的标签

Rose

YashanDB数据库数据压缩技术及应用场景

数据库砖家

如何安装SideFX Houdini?3D影视特效渲染SideFX Houdini Mac详细图文教程分享

Rose

mac电脑上U盘启动盘制作工具 balenaEtcher免费版

Rose

AI 对冲基金模拟系统

qife

金融科技 量化交易 AI投资

极简开发、精准分发,构建鸿蒙生态服务分发智能便捷新体验

新消费日报

YashanDB数据库数据迁移全流程实操指南

数据库砖家

3D灯光贴图渲染HDR Light Studio Xenon 破解补丁版 兼容Mac26系统及M芯片

Rose

KubeEdge源码分析之(六)metamanager_架构_华为云原生团队_InfoQ精选文章