写点什么

KubeEdge 源码分析之(六)metamanager

  • 2020-01-03
  • 本文字数:3874 字

    阅读完需:约 13 分钟

KubeEdge源码分析之(六)metamanager

本系列的源码分析是在 commit da92692baa660359bb314d89dfa3a80bffb1d26c 之上进行的。

cloudcore 部分的源码分析是在 kubeedge 源码分析系列之整体架构基础上展开的,如果没有阅读过 kubeedge 源码分析系列之整体架构,直接阅读本文,会感觉比较突兀。


本文对 edgecore 的 metamanager 模块进行剖析,metamanager 作为 edgecore 中的 edged 模块与 edgehub 模块进行交互的桥梁,除了将 edgehub 的消息转发给 edged,还对一些必要的数据通过 SQLite 进行了缓存,在某种程度上实现了 kubeedge 的 offline mode。本文就对 metamanager 所涉及的 SQLite 数据库相关逻辑和业务逻辑进行剖析:


1、metamanager 数据库相关逻辑剖析


2、metamanager 业务逻辑剖析

metamanager 数据库相关逻辑剖析

从 eventbus 的模块注册函数入手:


kubeedge/edge/pkg/eventbus/event_bus.go


//constant metamanager module name const (                        MetaManagerModuleName = "metaManager" ) ... // Register register metamanager func Register() {                          dbm.RegisterModel(MetaManagerModuleName, new(dao.Meta))               core.Register(&metaManager{}) }
复制代码


在注册函数 Register()中,做了 2 件事:


  1. 在 SQLite 中的数据库中初始化 metaManager 表


dbm.RegisterModel(MetaManagerModuleName, new(dao.Meta))
复制代码


  1. 注册已经初始化的 metamanager


core.Register(&metaManager{})
复制代码


下面深入剖析 “1.在 SQLite 中的数据库中初始化 metaManager 表” 相关内容,进入 dbm.RegisterModel(…):


kubeedge/edge/pkg/common/dbm/db.go


//RegisterModel registers the defined model in the orm if model is enabled func RegisterModel(moduleName string, m interface{}) {               if isModuleEnabled(moduleName) {                        orm.RegisterModel(m)                        ...              } else {                       ...            } }

复制代码


RegisterModel(…)函数是对 github.com/astaxie/bee…的封装,本文就不跟进去剖析了,感兴趣的同学可以在本文的基础上自行剖析。


回到“1.在 SQLite 中的数据库中初始化 metaManager 表”,深入剖析 metaManager 表的具体定义 dao.Meta,进入 dao.Meta 定义:


kubeedge/edge/pkg/metamanager/dao/meta.go


// Meta metadata object type Meta struct {              // ID    int64  `orm:"pk; auto; column(id)"`            Key   string `orm:"column(key); size(256); pk"`             Type  string `orm:"column(type); size(32)"`             Value string `orm:"column(value); null; type(text)"`}
复制代码


metaManager 表的具体定义包含 Key、Type 和 Value 三个字段,具体含义如下:


  • Key meta 的名字;

  • Type meta 对应的操作类型;

  • Value 具体的 meta 值;


与 Meta Struct 的定义在同一文件内,还有对 metaManager 表的一些操作定义,如 SaveMeta、DeleteMetaByKey、UpdateMeta、InsertOrUpdate、UpdateMetaField、UpdateMetaFields、QueryMeta、QueryAllMeta,本文不对具体操作的定义进行深入剖析,感兴趣的同学可以在本文的基础上自行剖析。

metamanager 业务逻辑剖析

从 metamanager 的模块启动函数入手:


kubeedge/edge/pkg/metamanager/module.go


func (m *metaManager) Start(c *context.Context) {              m.context = c              InitMetaManagerConfig()              go func() {                           period := getSyncInterval()                           timer := time.NewTimer(period)                          for {                                      select {                                      case <-timer.C:                                                  timer.Reset(period)                                                   msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)                                                  m.context.Send(MetaManagerModuleName, *msg)                                     }                        }            }()            m.mainLoop() }
复制代码


启动函数 Start(…)做了如下 4 件事:


  1. 接收并保存模块启动时传入的*context.Context 实例


m.context = c
复制代码


  1. 初始化 metamanager 配置


InitMetaManagerConfig()
复制代码


  1. 启动一个 goroutine 同步心跳信息


go func() {...}
复制代码


  1. 启动一个循环处理各种事件


m.mainLoop()
复制代码


接下来展开分析 2、3、4。


初始化 metamanager 配置


进入 InitMetaManagerConfig()定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


// InitMetaManagerConfig init meta config func InitMetaManagerConfig() {                    var err error                    groupName, err := config.CONFIG.GetValue("metamanager.context-send-group").ToString()  ...                  edgeSite, err := config.CONFIG.GetValue("metamanager.edgesite").ToBool() ...                  moduleName, err := config.CONFIG.GetValue("metamanager.context-send-module").ToString()... }

复制代码


在初始化 metamanager 配置时,从配置文件中获取了 metamanager.context-send-group、metamanager.edgesite、metamanager.context-send-module,根据获取的值对相关变量进行设置。


启动一个 goroutine 同步心跳信息


kubeedge/edge/pkg/metamanager/module.go


go func() {              period := getSyncInterval()               timer := time.NewTimer(period)               for {                           select {                           case <-timer.C:                                          timer.Reset(period)                                          msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)                                          m.context.Send(MetaManagerModuleName, *msg)                         }                    }           }
复制代码


在同步心跳信息的 goroutine 中,做了如下 2 件事:


  1. 获取通信心跳的时间间隔


period := getSyncInterval()
复制代码


  1. 创建定时器,并定时发送心跳信息


timer := time.NewTimer(period) for {...}
复制代码


启动一个循环处理各种事件


进入 m.mainLoop()定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


func (m *metaManager) mainLoop() {               go func() {                            for {                                                 if msg, err := m.context.Receive(m.Name()); err == nil {                                                 ...                                                m.process(msg)                            } else {                                                 ...                            }             }             }() }

复制代码


mainLoop()函数启动了一个 for 循环,在循环中主要做了 2 件事:


  1. 接收信息


msg, err := m.context.Receive(m.Name())
复制代码


  1. 对接收到的信息进行处理


m.process(msg)
复制代码


想弄明白对信息的处理过程,需要进入 m.process(…)的定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


func (m *metaManager) process(message model.Message) {                    operation := message.GetOperation()              switch operation {              case model.InsertOperation:                                m.processInsert(message)              case model.UpdateOperation:                               m.processUpdate(message)            case model.DeleteOperation:                                m.processDelete(message)            case model.QueryOperation:                                m.processQuery(message)             case model.ResponseOperation:                              m.processResponse(message)             case messagepkg.OperationNodeConnection:                               m.processNodeConnection(message)            case OperationMetaSync:                               m.processSync(message)            case OperationFunctionAction:                               m.processFunctionAction(message)            case OperationFunctionActionResult:                               m.processFunctionActionResult(message)             case constants.CSIOperationTypeCreateVolume,                     constants.CSIOperationTypeDeleteVolume,                    constants.CSIOperationTypeControllerPublishVolume,                    constants.CSIOperationTypeControllerUnpublishVolume:                    m.processVolume(message)}}
复制代码


process(…)函数中主要做了如下 2 件事:


  1. 获取消息的操作的类型


operation := message.GetOperation()
复制代码


  1. 根据信息操作类型对信息进行相应处理


switch operation { ... }
复制代码


信息的操作类型包括 insert、update、delete、query、response、publish、meta-internal-sync、action、action_result 等,本文不对信息的具体处理过程剖析,感兴趣的同学可以在本文的基础上自行剖析。


2020-01-03 10:481871

评论

发布
暂无评论
发现更多内容

但因热爱,愿迎万难,OpenTiny Vue Playground正式上线🎉

OpenTiny社区

开源 Vue 前端

带你认识数仓的监控系统TopSQL

华为云开发者联盟

数据库 后端 华为云 华为云开发者联盟 企业号 8 月 PK 榜

MoE 系列(六)|Envoy Go 扩展之并发安全

SOFAStack

golang 安全 后端 框架 envoy

提升制造业智能化水平——免费MES系统的领航者

万界星空科技

开源 制造业生产管理系统

关于搭建海外社交APP源码的干货

山东布谷网络科技

海外直播源码

JDK1.6在生产环境引起的坑

华为云开发者联盟

开发 华为云 华为云开发者联盟 企业号 8 月 PK 榜

zos静态网站托管的使用和原理

天翼云开发者社区

静态网站托管 静态网站

Spring Cloud实战案例 │ Apollo和Zuul的整合开发

TiAmo

Spring Cloud Apollo Zuul

基于Web3D+GIS智慧森林防火监测预警系统

2D3D前端可视化开发

智慧森林防火 森林火灾预警系统 森林火灾监测系统 森林数字防火 森林智能防火

聊聊 Docker 和 Dockerfile

互联网工科生

Docker 容器 Dockerfile

人人都在聊的IPD(集成产品开发)到底是什么?一文让你读懂TA!

禅道项目管理

华为 项目管理 软件 硬件

天翼云加入云原生安全实验室,推进行业标准制定和生态建设!

天翼云开发者社区

云计算 云原生

【华秋推荐】物联网入门学习模块 ESP8266

华秋电子

物联网

为什么马斯克和奥特曼都想重振加密货币?

树上有只程序猿

人工智能 AGI

HPC中常见的调度器介绍

天翼云开发者社区

云计算 HPC 高性能计算

高效构建 vivo 企业级网络流量分析系统

vivo互联网技术

流量采集 流量监控 sFlow

小灯塔系列-中小企业数字化转型系列研究-ERP测评报告

向量智库

小灯塔系列|中小企业数字化转型系列研究——IM测评报告

向量智库

小灯塔系列|中小企业数字化转型系列研究——云存储测评报告

向量智库

不要仅限于只做测试工作

老张

软件测试 职场成长

风很大的“云数仓”到底怎么用?三家企业交出答卷

字节跳动数据平台

数据库 云原生 企业号 8 月 PK 榜

“科创中国”青百会轮值主席吴甜:以大语言模型为代表的AI将引发产业变革

飞桨PaddlePaddle

人工智能 百度 paddle 飞桨 百度飞桨

基于分级安全的OpenHarmony架构设计

OpenHarmony开发者

OpenHarmony

物理机是什么?有什么优势?可以上堡垒机吗?

行云管家

等保 堡垒机 等级保护 物理机

MoE 系列(七)| Envoy Go 扩展之沙箱安全

SOFAStack

golang 后端 框架 envoy 安全沙箱

下一代MES系统架构分析与选型参考

华为云开发者联盟

云计算 后端 华为云 华为云开发者联盟 企业号 8 月 PK 榜

KubeEdge源码分析之(六)metamanager_架构_华为云原生团队_InfoQ精选文章