写点什么

KubeEdge 源码分析之(六)metamanager

  • 2020-01-03
  • 本文字数:3874 字

    阅读完需:约 13 分钟

KubeEdge源码分析之(六)metamanager

本系列的源码分析是在 commit da92692baa660359bb314d89dfa3a80bffb1d26c 之上进行的。

cloudcore 部分的源码分析是在 kubeedge 源码分析系列之整体架构基础上展开的,如果没有阅读过 kubeedge 源码分析系列之整体架构,直接阅读本文,会感觉比较突兀。


本文对 edgecore 的 metamanager 模块进行剖析,metamanager 作为 edgecore 中的 edged 模块与 edgehub 模块进行交互的桥梁,除了将 edgehub 的消息转发给 edged,还对一些必要的数据通过 SQLite 进行了缓存,在某种程度上实现了 kubeedge 的 offline mode。本文就对 metamanager 所涉及的 SQLite 数据库相关逻辑和业务逻辑进行剖析:


1、metamanager 数据库相关逻辑剖析


2、metamanager 业务逻辑剖析

metamanager 数据库相关逻辑剖析

从 eventbus 的模块注册函数入手:


kubeedge/edge/pkg/eventbus/event_bus.go


//constant metamanager module name const (                        MetaManagerModuleName = "metaManager" ) ... // Register register metamanager func Register() {                          dbm.RegisterModel(MetaManagerModuleName, new(dao.Meta))               core.Register(&metaManager{}) }
复制代码


在注册函数 Register()中,做了 2 件事:


  1. 在 SQLite 中的数据库中初始化 metaManager 表


dbm.RegisterModel(MetaManagerModuleName, new(dao.Meta))
复制代码


  1. 注册已经初始化的 metamanager


core.Register(&metaManager{})
复制代码


下面深入剖析 “1.在 SQLite 中的数据库中初始化 metaManager 表” 相关内容,进入 dbm.RegisterModel(…):


kubeedge/edge/pkg/common/dbm/db.go


//RegisterModel registers the defined model in the orm if model is enabled func RegisterModel(moduleName string, m interface{}) {               if isModuleEnabled(moduleName) {                        orm.RegisterModel(m)                        ...              } else {                       ...            } }

复制代码


RegisterModel(…)函数是对 github.com/astaxie/bee…的封装,本文就不跟进去剖析了,感兴趣的同学可以在本文的基础上自行剖析。


回到“1.在 SQLite 中的数据库中初始化 metaManager 表”,深入剖析 metaManager 表的具体定义 dao.Meta,进入 dao.Meta 定义:


kubeedge/edge/pkg/metamanager/dao/meta.go


// Meta metadata object type Meta struct {              // ID    int64  `orm:"pk; auto; column(id)"`            Key   string `orm:"column(key); size(256); pk"`             Type  string `orm:"column(type); size(32)"`             Value string `orm:"column(value); null; type(text)"`}
复制代码


metaManager 表的具体定义包含 Key、Type 和 Value 三个字段,具体含义如下:


  • Key meta 的名字;

  • Type meta 对应的操作类型;

  • Value 具体的 meta 值;


与 Meta Struct 的定义在同一文件内,还有对 metaManager 表的一些操作定义,如 SaveMeta、DeleteMetaByKey、UpdateMeta、InsertOrUpdate、UpdateMetaField、UpdateMetaFields、QueryMeta、QueryAllMeta,本文不对具体操作的定义进行深入剖析,感兴趣的同学可以在本文的基础上自行剖析。

metamanager 业务逻辑剖析

从 metamanager 的模块启动函数入手:


kubeedge/edge/pkg/metamanager/module.go


func (m *metaManager) Start(c *context.Context) {              m.context = c              InitMetaManagerConfig()              go func() {                           period := getSyncInterval()                           timer := time.NewTimer(period)                          for {                                      select {                                      case <-timer.C:                                                  timer.Reset(period)                                                   msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)                                                  m.context.Send(MetaManagerModuleName, *msg)                                     }                        }            }()            m.mainLoop() }
复制代码


启动函数 Start(…)做了如下 4 件事:


  1. 接收并保存模块启动时传入的*context.Context 实例


m.context = c
复制代码


  1. 初始化 metamanager 配置


InitMetaManagerConfig()
复制代码


  1. 启动一个 goroutine 同步心跳信息


go func() {...}
复制代码


  1. 启动一个循环处理各种事件


m.mainLoop()
复制代码


接下来展开分析 2、3、4。


初始化 metamanager 配置


进入 InitMetaManagerConfig()定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


// InitMetaManagerConfig init meta config func InitMetaManagerConfig() {                    var err error                    groupName, err := config.CONFIG.GetValue("metamanager.context-send-group").ToString()  ...                  edgeSite, err := config.CONFIG.GetValue("metamanager.edgesite").ToBool() ...                  moduleName, err := config.CONFIG.GetValue("metamanager.context-send-module").ToString()... }

复制代码


在初始化 metamanager 配置时,从配置文件中获取了 metamanager.context-send-group、metamanager.edgesite、metamanager.context-send-module,根据获取的值对相关变量进行设置。


启动一个 goroutine 同步心跳信息


kubeedge/edge/pkg/metamanager/module.go


go func() {              period := getSyncInterval()               timer := time.NewTimer(period)               for {                           select {                           case <-timer.C:                                          timer.Reset(period)                                          msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)                                          m.context.Send(MetaManagerModuleName, *msg)                         }                    }           }
复制代码


在同步心跳信息的 goroutine 中,做了如下 2 件事:


  1. 获取通信心跳的时间间隔


period := getSyncInterval()
复制代码


  1. 创建定时器,并定时发送心跳信息


timer := time.NewTimer(period) for {...}
复制代码


启动一个循环处理各种事件


进入 m.mainLoop()定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


func (m *metaManager) mainLoop() {               go func() {                            for {                                                 if msg, err := m.context.Receive(m.Name()); err == nil {                                                 ...                                                m.process(msg)                            } else {                                                 ...                            }             }             }() }

复制代码


mainLoop()函数启动了一个 for 循环,在循环中主要做了 2 件事:


  1. 接收信息


msg, err := m.context.Receive(m.Name())
复制代码


  1. 对接收到的信息进行处理


m.process(msg)
复制代码


想弄明白对信息的处理过程,需要进入 m.process(…)的定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


func (m *metaManager) process(message model.Message) {                    operation := message.GetOperation()              switch operation {              case model.InsertOperation:                                m.processInsert(message)              case model.UpdateOperation:                               m.processUpdate(message)            case model.DeleteOperation:                                m.processDelete(message)            case model.QueryOperation:                                m.processQuery(message)             case model.ResponseOperation:                              m.processResponse(message)             case messagepkg.OperationNodeConnection:                               m.processNodeConnection(message)            case OperationMetaSync:                               m.processSync(message)            case OperationFunctionAction:                               m.processFunctionAction(message)            case OperationFunctionActionResult:                               m.processFunctionActionResult(message)             case constants.CSIOperationTypeCreateVolume,                     constants.CSIOperationTypeDeleteVolume,                    constants.CSIOperationTypeControllerPublishVolume,                    constants.CSIOperationTypeControllerUnpublishVolume:                    m.processVolume(message)}}
复制代码


process(…)函数中主要做了如下 2 件事:


  1. 获取消息的操作的类型


operation := message.GetOperation()
复制代码


  1. 根据信息操作类型对信息进行相应处理


switch operation { ... }
复制代码


信息的操作类型包括 insert、update、delete、query、response、publish、meta-internal-sync、action、action_result 等,本文不对信息的具体处理过程剖析,感兴趣的同学可以在本文的基础上自行剖析。


2020-01-03 10:481672

评论

发布
暂无评论
发现更多内容

什么是“语法糖”?Java中有哪些常见糖?

Java你猿哥

Java ssm Java工程师 语法糖

阿里巴巴为什么不建议直接使用@Async注解?

Java你猿哥

Java ssm java8 Async Java工程师

微服务架构下你不得不知的3种部署策略

做梦都在改BUG

Java 架构 微服务

运动健康路线导入,助力用户轻松导航

HarmonyOS SDK

HMS Core

接通率维持66%以上,为什么火山引擎VeDI能让企业智能外呼不再难?

字节跳动数据平台

营销 用户增长 业务增长 客户数据 企业号 3 月 PK 榜

干货分享|袋鼠云数栈离线开发平台在小文件治理上的探索实践之路

袋鼠云数栈

大数据 平台开发

MySQL 语句中 where 条件后为什么写上1=1 , 是什么意思?

Java你猿哥

Java MySQL sql 后端 ssm

太厉害了!腾讯T4大牛把《数据结构与算法》讲透了,带源码笔记

Java你猿哥

Java 数据结构 算法 数据结构算法 左程云

马鞍山等级测评机构有哪些?有几家?在哪里?

行云管家

等保测评 等级测评 马鞍山

凭借左程云(左神)的这份 “程序员代码面试指南”我入职了字节

Java你猿哥

Java 算法 数组 二叉树 面经

是找茬? 还是装 B?阿里面试每轮必问的“Spring Boot”意义何在?

三十而立

弯道超车!阿里高工新产Java面试速成指南,面试骚操作都在里面了

Java你猿哥

Java 面试 面经 Java工程师 春招

低代码平台搭建CRM 加速重构业务模式

力软低代码开发平台

数据库 CI/CD 工具 -- Bytebase 介绍

Se7en

Bytebase vs Flyway

Bytebase

数据库 版本控制 变更

LED透明屏私人定制势不可挡

Dylan

电子 LED显示屏 屏幕

AI + Kubernetes 赋能DevSecOps 的思考

HummerCloud

人工智能 Kubernetes DevOps

最佳实践 | 用腾讯云智能语音打造智能对话机器人

牵着蜗牛去散步

腾讯云 腾讯 语音识别 语音合成 智能对话机器人

从 1000+ 参赛项目突围,涛思数据荣获 ITEC 2022 全球创业赛成长组二等奖

TDengine

tdengine 物联网 时序数据库 数字经济 大数据 开源

在 Kubernetes 中部署应用交付服务(第 2 部分)

NGINX开源社区

nginx Kubernetes

厉害了!阿里内部都用的Spring+MyBatis源码手册,实战理论两不误

Java你猿哥

spring 面试 Spring Boot mybatis 面经

机器学习算法(九): 基于线性判别模型的LDA手写数字分类识别

汀丶人工智能

人工智能 数据挖掘 机器学习 LDA算法

Dubbo 正式支持 Spring 6 & Spring Boot 3

Java你猿哥

Java spring Spring Boot dubbo ssm

不懂就问:MySQL delete 表数据,磁盘空间为什么没有被释放?

Java你猿哥

Java MySQL 数据库 innodb Java工程师

数据出境是什么意思?我国数据出境合规要求是什么?

行云管家

数据 数据安全 堡垒机 数据出境

2023字节、腾讯、阿里等6家大厂Java开发面试真题+高频面试题总结

小小怪下士

Java java程序员 java面试 Java面试题

限时公开,2023 年阿里巴巴 Java 面试权威指南(全彩版)

架构师之道

Java 面试

置顶两个月!《程序员如何向架构师转型》神作在Github持续霸榜

做梦都在改BUG

Java 程序员 系统设计 架构师

用注解的方式优雅实现Ression分布式锁

做梦都在改BUG

Java 分布式锁 Ression

手慢无!阿里云神作《Spring Boot进阶原理实战》真的太全了!

做梦都在改BUG

Java 微服务 Spring Boot 框架

KubeEdge源码分析之(六)metamanager_架构_华为云原生团队_InfoQ精选文章