写点什么

Kubernetes 中的事件处理机制

tianfeiyu

  • 2019-12-31
  • 本文字数:5559 字

    阅读完需:约 18 分钟

Kubernetes 中的事件处理机制

前言

当集群中的 node 或 pod 异常时,大部分用户会使用 kubectl 查看对应的 events,那么 events 是从何而来?


其实 K8s 中的各个组件会将运行时产生的各种事件汇报到 apiserver,对于 K8s 中的可描述资源,使用 kubectl describe 都可以看到其相关的 events,那 K8s 中又有哪几个组件都上报 events 呢?


只要在 k8s.io/kubernetes/cmd 目录下暴力搜索一下就能知道哪些组件会产生 events:



可以看出,controller-manage、kube-proxy、kube-scheduler、kubelet 都使用了 EventRecorder,本文只讲述 kubelet 中对 Events 的使用。


01


Events 的定义


events 在 k8s.io/api/core/v1/types.go 中进行定义,结构体如下所示:



image.png


其中 InvolvedObject 代表和事件关联的对象,source 代表事件源,使用 kubectl 看到的事件一般包含 Type、Reason、Age、From、Message 几个字段。


K8s 中 events 目前只有两种类型:“Normal” 和 “Warning”:



events 的两种类型


02


EventBroadcaster 的初始化


events 的整个生命周期都与 EventBroadcaster 有关,kubelet 中对 EventBroadcaster 的初始化在 k8s.io/kubernetes/cmd/kubelet/app/server.go 中:


func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {



// event 初始化


makeEventRecorder(kubeDeps, nodeName)



}


func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {


if kubeDeps.Recorder != nil { return }


// 初始化 EventBroadcaster


eventBroadcaster := record.NewBroadcaster()


// 初始化 EventRecorder


kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})


// 记录 events 到本地日志


eventBroadcaster.StartLogging(glog.V(3).Infof)


if kubeDeps.EventClient != nil {


glog.V(4).Infof(“Sending events to api server.”)


// 上报 events 到 apiserver


eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})


} else {


glog.Warning(“No api server defined - no events will be sent to API server.”)


}


}Kubelet 在启动的时候会初始化一个 EventBroadcaster,它主要是对接收到的 events 做一些后续的处理(保存、上报等),EventBroadcaster 也会被 kubelet 中的其他模块使用,以下是相关的定义,对 events 生成和处理的函数都定义在


k8s.io/client-go/tools/record/event.go 中:



EventBroadcaster 是个接口类型,该接口有以下四个方法:


  • StartEventWatcher():EventBroadcaster 中的核心方法,接收各模块产生的 events,参数为一个处理 events 的函数,用户可以使用 StartEventWatcher() 接收 events 然后使用自定义的 handle 进行处理

  • StartRecordingToSink():调用

  • StartEventWatcher() :接收 events,并将收到的 events 发送到 apiserver

  • StartLogging():也是调用 StartEventWatcher() 接收 events,然后保存 events 到日志

  • NewRecorder():会创建一个指定 EventSource 的 EventRecorder,EventSource 指明了哪个节点的哪个组件


eventBroadcasterImpl 是 eventBroadcaster 实际的对象,初始化 EventBroadcaster 对象的时候会初始化一个 Broadcaster,Broadcaster 会启动一个 goroutine 接收各组件产生的 events 并广播到每一个 watcher。


func NewBroadcaster() EventBroadcaster {


return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}


}


可以看到,kubelet 在初始化完 EventBroadcaster 后会调用 StartRecordingToSink() 和 StartLogging() 两个方法,StartRecordingToSink() 处理函数会将收到的 events 进行缓存、过滤、聚合而后发送到 apiserver,StartLogging() 仅将 events 保存到 kubelet 的日志中。


03


Events 的生成


从初始化 EventBroadcaster 的代码中可以看到 kubelet 在初始化完 EventBroadcaster 后紧接着初始化了 EventRecorder,并将已经初始化的 Broadcaster 对象作为参数传给了 EventRecorder,至此,EventBroadcaster、EventRecorder、Broadcaster 三个对象产生了关联。EventRecorder 的主要功能是生成指定格式的 events,以下是相关的定义:


type recorderImpl struct {


scheme *runtime.Scheme


source v1.EventSource


*watch.Broadcaster


clock clock.Clock


}


type EventRecorder interface {


Event(object runtime.Object, eventtype, reason, message string) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args …interface{})


PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args …interface{})


AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args …interface{})


}EventRecorder 中包含的几个方法都是产生指定格式的 events,Event() 和 Eventf() 的功能类似 fmt.Println() 和 fmt.Printf(),kubelet 中的各个模块会调用 EventRecorder 生成 events。recorderImpl 是 EventRecorder 实际的对象。EventRecorder 的每个方法会调用 generateEvent,在 generateEvent 中初始化 events 。


以下是生成 events 的函数:


func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {


ref, err := ref.GetReference(recorder.scheme, object)


if err != nil {


glog.Errorf(“Could not construct reference to: ‘%#v’ due to: ‘%v’. Will not report event: ‘%v’ ‘%v’ ‘%v’”, object, err, eventtype, reason, message)


return }


if !validateEventType(eventtype) {


glog.Errorf(“Unsupported event type: ‘%v’”, eventtype) return }


event := recorder.makeEvent(ref, annotations, eventtype, reason, message)


event.Source = recorder.source go func() {


// NOTE: events should be a non-blocking operation


defer utilruntime.HandleCrash() // 发送事件 recorder.Action(watch.Added, event)


}()


}func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {


t := metav1.Time{Time: recorder.clock.Now()}


namespace := ref.Namespace


if namespace == “” {


namespace = metav1.NamespaceDefault


}


return &v1.Event{


ObjectMeta: metav1.ObjectMeta{


Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),


Namespace: namespace,


Annotations: annotations,


},


InvolvedObject: *ref,


Reason: reason,


Message: message,


FirstTimestamp: t,


LastTimestamp: t,


Count: 1,


Type: eventtype,初始化完 events 后会调用 recorder.Action() 将 events 发送到 Broadcaster 的事件接收队列中, Action() 是 Broadcaster 中的方法。


以下是 Action() 方法的实现:



04


Events 的广播


上面已经说了,EventBroadcaster 初始化时会初始化一个 Broadcaster,Broadcaster 的作用就是接收所有的 events 并进行广播,Broadcaster 的实现在 k8s.io/apimachinery/pkg/watch/mux.go 中,Broadcaster 初始化完成后会在后台启动一个 goroutine,然后接收所有从 EventRecorder 发送过来的 events,Broadcaster 中有一个 map 会保存每一个注册的 watcher, 接着将 events 广播给所有的 watcher,每个 watcher 都有一个接收消息的 channel,watcher 可以通过它的 ResultChan() 方法从 channel 中读取数据进行消费。


以下是 Broadcaster 广播 events 的实现:



05


Events 的处理


那么 watcher 是从何而来呢?每一个要处理 events 的 client 都需要初始化一个 watcher,处理 events 的方法是在 EventBroadcaster 中定义的,以下是 EventBroadcaster 中对 events 处理的三个函数:


func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {


watcher := eventBroadcaster.Watch()


go func() {


defer utilruntime.HandleCrash()


for watchEvent := range watcher.ResultChan() {


event, ok := watchEvent.Object.(*v1.Event)


if !ok {


// This is all local, so there’s no reason this should


// ever happen.


continue


}


eventHandler(event)


}


}()


return watcher


StartEventWatcher() 首先实例化一个 watcher,每个 watcher 都会被塞入到 Broadcaster 的 watcher 列表中,watcher 从 Broadcaster 提供的 channel 中读取 events,然后再调用 eventHandler 进行处理,StartLogging() 和 StartRecordingToSink() 都是对 StartEventWatcher() 的封装,都会传入自己的处理函数。


func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args …interface{})) watch.Interface {


return eventBroadcaster.StartEventWatcher(


func(e *v1.Event) {


logf(“Event(%#v): type: ‘%v’ reason: ‘%v’ %v”, e.InvolvedObject, e.Type, e.Reason, e.Message)


})


StartLogging() 传入的 eventHandler 仅将 events 保存到日志中。


func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {


// The default math/rand package functions aren’t thread safe, so create a


// new Rand object for each StartRecording call.


randGen := rand.New(rand.NewSource(time.Now().UnixNano()))


eventCorrelator := NewEventCorrelator(clock.RealClock{})


return eventBroadcaster.StartEventWatcher(


func(event *v1.Event) {


recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)


})


}


func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {


eventCopy := *event


event = &eventCopy


result, err := eventCorrelator.EventCorrelate(event)


if err != nil {


utilruntime.HandleError(err)


}


if result.Skip { return }


tries := 0


for {


if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {


break


}


tries++


if tries >= maxTriesPerEvent {


glog.Errorf(“Unable to write event ‘%#v’ (retry limit exceeded!)”, event)


break


}


// 第一次重试增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件


if tries == 1 {


time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))


} else {


time.Sleep(sleepDuration)


}


}StartRecordingToSink() 方法先根据当前时间生成一个随机数发生器 randGen,增加随机数是为了在重试时增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件,接着实例化一个 EventCorrelator,EventCorrelator 会对事件做一些预处理的工作,其中包括过滤、聚合、缓存等操作,具体代码不做详细分析,最后将 recordToSink() 函数作为处理函数,recordToSink() 会将处理后的 events 发送到 apiserver,这是 StartEventWatcher() 的整个工作流程。


06


Events 简单实现


了解完 events 的整个处理流程后,可以参考其实现方式写一个 demo,要实现一个完整的 events 需要包含以下几个功能:


  1. 事件的产生

  2. 事件的发送

  3. 事件广播

  4. 事件缓存

  5. 事件过滤和聚合







此处仅简单实现,将 EventRecorder 处理 events 的功能直接放在了 EventBroadcaster 中实现,对 events 的处理方法仅实现了 StartLogging(),Broadcaster 中的部分功能是直接复制 K8s 中的代码,有一定的精简,其实现值得学习,此处对 EventCorrelator 并没有进行实现。


代码请参考:


https://github.com/gosoon/k8s-learning-notes/tree/master/k8s-package/events


07


总结


本文讲述了 K8s 中 events 从产生到展示的一个完整过程,最后也实现了一个简单的 demo,在此将 kubelet 对 events 的整个处理过程再梳理下,其中主要有三个对象 EventBroadcaster、EventRecorder、Broadcaster:


  • kubelet 首先会初始化 EventBroadcaster 对象,同时会初始化一个 Broadcaster 对象。

  • kubelet 通过 EventBroadcaster 对象的 NewRecorder() 方法初始化 EventRecorder 对象,EventRecorder 对象提供的几个方法会生成 events 并通过 Action() 方法发送 events 到 Broadcaster 的 channel 队列中。

  • Broadcaster 的作用就是接收所有的 events 并进行广播,Broadcaster 初始化后会在后台启动一个 goroutine,然后接收所有从 EventRecorder 发来的 events。

  • EventBroadcaster 对 events 有三个处理方法:

  • StartEventWatcher()

  • StartRecordingToSink()

  • StartLogging(),StartEventWatcher() 是其中的核心方法,会初始化一个 watcher 注册到 Broadcaster,其余两个处理函数对 StartEventWatcher() 进行了封装,并实现了自己的处理函数。

  • Broadcaster 中有一个 map 会保存每一个注册的 watcher,其会将所有的 events 广播给每一个 watcher,每个 watcher 通过它的 ResultChan() 方法从 channel 接收 events。

  • kubelet 会使用 StartRecordingToSink() 和 StartLogging() 对 events 进行处理,StartRecordingToSink() 处理函数收到 events 后会进行缓存、过滤、聚合而后发送到 apiserver,apiserver 会将 events 保存到 etcd 中,使用 kubectl 或其他客户端可以查看。StartLogging() 仅将 events 保存到 kubelet 的日志中。


第四课:KubeEdge 设备管理设计原理


晚 8:00 直播


识别下图二维码,加群获取课程材料



2019-12-31 16:462321

评论

发布
暂无评论
发现更多内容

云上可靠性测试:让我们一起给开发找点事儿

华为云开发者联盟

安全 云服务 可靠性

K8S 资源可视化利器:Kubectl-Graph

郭旭东

Kubernetes Kubernetes Plugin

CAP 原理 <笔记>

raox

极客大学架构师训练营

与前端训练营的日子 --Week09

SamGo

学习

为移动应用产业开辟出海新航路,华为应用市场是如何“破冰”的?

脑极体

Demo分享丨看ModelArts与HiLens是如何让车自己跑起来的

华为云开发者联盟

人工智能 智能车 hilens

测开之函数进阶· 第4篇《匿名函数》

清菡软件测试

测试开发

LeetCode题解:剑指 Offer 40. 最小的k个数,快速排序,JavaScript,详细注释

Lee Chen

算法 大前端 LeetCode

冰河又一MySQL力作出版(文末送书)!!

冰河

MySQL 高可用 高并发 高性能 MySQL架构

SpringBoot,来实现MySQL读写分离技术

Java架构师迁哥

在wildfly 21中搭建cluster集群

程序那些事

程序那些事 wildfly wildfly21 集群部署 集群架构

架构大作业二

Geek_michael

极客大学架构师训练营

7. JDK拍了拍你:字符串拼接一定记得用MessageFormat#format

YourBatman

Spring Framework 类型转换 MessageFormat DateFormat

姐夫半夜不睡觉,竟躲在厕所看这“57道Redis面试题”?

Java架构之路

Java 程序员 架构 面试 编程语言

JAVA并发编程原理与实战

Geek_53983e

原理 java 并发 实战

架构师训练营 - 大作业 2

阿甘

二十多岁的年纪是怎么成功四面字节跳动,最终拿到offer的?

Java架构之路

Java 程序员 架构 面试 编程语言

Java岗四面字节跳动成功之前,我都刷了那些面试题以及做了那些准备!

Java架构之路

Java 程序员 架构 面试 编程语言

架构大作业一

Geek_michael

极客大学架构师训练营

面试官:Android事件分发机制及设计思路,跳槽薪资翻倍

欢喜学安卓

android 程序员 面试 移动开发

手把手教你写!2021年Android工作或更难找,最全的BAT大厂面试题整理

欢喜学安卓

android 程序员 面试 移动开发

速来围观!阿里P8大牛写出的JDK源码剖析及大型网站技术架构与业务架构融合之道

Java架构之路

Java 程序员 架构 面试 编程语言

支持 gRPC 长链接,深度解读 Nacos 2.0 架构设计及新模型

阿里巴巴云原生

云计算 阿里云 开源 微服务 云原生

扫地阿姨看完都学会了!万字长文总结Android多进程,满满干货指导

欢喜学安卓

android 程序员 面试 移动开发

架构师训练营第五周”技术选型一“作业

随秋

极客大学架构师训练营

GitHub标星力推!我掏空了各大搜索引擎,给你整理了188道Java面试题,满满干货记得收藏

Java架构之路

Java 程序员 架构 面试 编程语言

自研ARM芯片,亲手拆掉Wintel联盟,微软这次是认真的吗?

脑极体

突破2.8万美元关口,比特币为何“疯涨”? ​

CECBC

比特币 比特币数字货币

架构师训练营 - 大作业1

阿甘

甲方日常 76

句子

工作 随笔杂谈 日常

Spring Cloud 2020.0.0 正式发布,对开发者来说意味着什么?

阿里巴巴云原生

阿里云 容器 开发者 云原生 架构师

Kubernetes 中的事件处理机制_服务革新_InfoQ精选文章