速来报名!AICon北京站鸿蒙专场~ 了解详情
写点什么

Kubernetes 中的事件处理机制

tianfeiyu

  • 2019-12-31
  • 本文字数:5559 字

    阅读完需:约 18 分钟

Kubernetes 中的事件处理机制

前言

当集群中的 node 或 pod 异常时,大部分用户会使用 kubectl 查看对应的 events,那么 events 是从何而来?


其实 K8s 中的各个组件会将运行时产生的各种事件汇报到 apiserver,对于 K8s 中的可描述资源,使用 kubectl describe 都可以看到其相关的 events,那 K8s 中又有哪几个组件都上报 events 呢?


只要在 k8s.io/kubernetes/cmd 目录下暴力搜索一下就能知道哪些组件会产生 events:



可以看出,controller-manage、kube-proxy、kube-scheduler、kubelet 都使用了 EventRecorder,本文只讲述 kubelet 中对 Events 的使用。


01


Events 的定义


events 在 k8s.io/api/core/v1/types.go 中进行定义,结构体如下所示:



image.png


其中 InvolvedObject 代表和事件关联的对象,source 代表事件源,使用 kubectl 看到的事件一般包含 Type、Reason、Age、From、Message 几个字段。


K8s 中 events 目前只有两种类型:“Normal” 和 “Warning”:



events 的两种类型


02


EventBroadcaster 的初始化


events 的整个生命周期都与 EventBroadcaster 有关,kubelet 中对 EventBroadcaster 的初始化在 k8s.io/kubernetes/cmd/kubelet/app/server.go 中:


func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {



// event 初始化


makeEventRecorder(kubeDeps, nodeName)



}


func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {


if kubeDeps.Recorder != nil { return }


// 初始化 EventBroadcaster


eventBroadcaster := record.NewBroadcaster()


// 初始化 EventRecorder


kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})


// 记录 events 到本地日志


eventBroadcaster.StartLogging(glog.V(3).Infof)


if kubeDeps.EventClient != nil {


glog.V(4).Infof(“Sending events to api server.”)


// 上报 events 到 apiserver


eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})


} else {


glog.Warning(“No api server defined - no events will be sent to API server.”)


}


}Kubelet 在启动的时候会初始化一个 EventBroadcaster,它主要是对接收到的 events 做一些后续的处理(保存、上报等),EventBroadcaster 也会被 kubelet 中的其他模块使用,以下是相关的定义,对 events 生成和处理的函数都定义在


k8s.io/client-go/tools/record/event.go 中:



EventBroadcaster 是个接口类型,该接口有以下四个方法:


  • StartEventWatcher():EventBroadcaster 中的核心方法,接收各模块产生的 events,参数为一个处理 events 的函数,用户可以使用 StartEventWatcher() 接收 events 然后使用自定义的 handle 进行处理

  • StartRecordingToSink():调用

  • StartEventWatcher() :接收 events,并将收到的 events 发送到 apiserver

  • StartLogging():也是调用 StartEventWatcher() 接收 events,然后保存 events 到日志

  • NewRecorder():会创建一个指定 EventSource 的 EventRecorder,EventSource 指明了哪个节点的哪个组件


eventBroadcasterImpl 是 eventBroadcaster 实际的对象,初始化 EventBroadcaster 对象的时候会初始化一个 Broadcaster,Broadcaster 会启动一个 goroutine 接收各组件产生的 events 并广播到每一个 watcher。


func NewBroadcaster() EventBroadcaster {


return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}


}


可以看到,kubelet 在初始化完 EventBroadcaster 后会调用 StartRecordingToSink() 和 StartLogging() 两个方法,StartRecordingToSink() 处理函数会将收到的 events 进行缓存、过滤、聚合而后发送到 apiserver,StartLogging() 仅将 events 保存到 kubelet 的日志中。


03


Events 的生成


从初始化 EventBroadcaster 的代码中可以看到 kubelet 在初始化完 EventBroadcaster 后紧接着初始化了 EventRecorder,并将已经初始化的 Broadcaster 对象作为参数传给了 EventRecorder,至此,EventBroadcaster、EventRecorder、Broadcaster 三个对象产生了关联。EventRecorder 的主要功能是生成指定格式的 events,以下是相关的定义:


type recorderImpl struct {


scheme *runtime.Scheme


source v1.EventSource


*watch.Broadcaster


clock clock.Clock


}


type EventRecorder interface {


Event(object runtime.Object, eventtype, reason, message string) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args …interface{})


PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args …interface{})


AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args …interface{})


}EventRecorder 中包含的几个方法都是产生指定格式的 events,Event() 和 Eventf() 的功能类似 fmt.Println() 和 fmt.Printf(),kubelet 中的各个模块会调用 EventRecorder 生成 events。recorderImpl 是 EventRecorder 实际的对象。EventRecorder 的每个方法会调用 generateEvent,在 generateEvent 中初始化 events 。


以下是生成 events 的函数:


func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {


ref, err := ref.GetReference(recorder.scheme, object)


if err != nil {


glog.Errorf(“Could not construct reference to: ‘%#v’ due to: ‘%v’. Will not report event: ‘%v’ ‘%v’ ‘%v’”, object, err, eventtype, reason, message)


return }


if !validateEventType(eventtype) {


glog.Errorf(“Unsupported event type: ‘%v’”, eventtype) return }


event := recorder.makeEvent(ref, annotations, eventtype, reason, message)


event.Source = recorder.source go func() {


// NOTE: events should be a non-blocking operation


defer utilruntime.HandleCrash() // 发送事件 recorder.Action(watch.Added, event)


}()


}func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {


t := metav1.Time{Time: recorder.clock.Now()}


namespace := ref.Namespace


if namespace == “” {


namespace = metav1.NamespaceDefault


}


return &v1.Event{


ObjectMeta: metav1.ObjectMeta{


Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),


Namespace: namespace,


Annotations: annotations,


},


InvolvedObject: *ref,


Reason: reason,


Message: message,


FirstTimestamp: t,


LastTimestamp: t,


Count: 1,


Type: eventtype,初始化完 events 后会调用 recorder.Action() 将 events 发送到 Broadcaster 的事件接收队列中, Action() 是 Broadcaster 中的方法。


以下是 Action() 方法的实现:



04


Events 的广播


上面已经说了,EventBroadcaster 初始化时会初始化一个 Broadcaster,Broadcaster 的作用就是接收所有的 events 并进行广播,Broadcaster 的实现在 k8s.io/apimachinery/pkg/watch/mux.go 中,Broadcaster 初始化完成后会在后台启动一个 goroutine,然后接收所有从 EventRecorder 发送过来的 events,Broadcaster 中有一个 map 会保存每一个注册的 watcher, 接着将 events 广播给所有的 watcher,每个 watcher 都有一个接收消息的 channel,watcher 可以通过它的 ResultChan() 方法从 channel 中读取数据进行消费。


以下是 Broadcaster 广播 events 的实现:



05


Events 的处理


那么 watcher 是从何而来呢?每一个要处理 events 的 client 都需要初始化一个 watcher,处理 events 的方法是在 EventBroadcaster 中定义的,以下是 EventBroadcaster 中对 events 处理的三个函数:


func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {


watcher := eventBroadcaster.Watch()


go func() {


defer utilruntime.HandleCrash()


for watchEvent := range watcher.ResultChan() {


event, ok := watchEvent.Object.(*v1.Event)


if !ok {


// This is all local, so there’s no reason this should


// ever happen.


continue


}


eventHandler(event)


}


}()


return watcher


StartEventWatcher() 首先实例化一个 watcher,每个 watcher 都会被塞入到 Broadcaster 的 watcher 列表中,watcher 从 Broadcaster 提供的 channel 中读取 events,然后再调用 eventHandler 进行处理,StartLogging() 和 StartRecordingToSink() 都是对 StartEventWatcher() 的封装,都会传入自己的处理函数。


func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args …interface{})) watch.Interface {


return eventBroadcaster.StartEventWatcher(


func(e *v1.Event) {


logf(“Event(%#v): type: ‘%v’ reason: ‘%v’ %v”, e.InvolvedObject, e.Type, e.Reason, e.Message)


})


StartLogging() 传入的 eventHandler 仅将 events 保存到日志中。


func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {


// The default math/rand package functions aren’t thread safe, so create a


// new Rand object for each StartRecording call.


randGen := rand.New(rand.NewSource(time.Now().UnixNano()))


eventCorrelator := NewEventCorrelator(clock.RealClock{})


return eventBroadcaster.StartEventWatcher(


func(event *v1.Event) {


recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)


})


}


func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {


eventCopy := *event


event = &eventCopy


result, err := eventCorrelator.EventCorrelate(event)


if err != nil {


utilruntime.HandleError(err)


}


if result.Skip { return }


tries := 0


for {


if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {


break


}


tries++


if tries >= maxTriesPerEvent {


glog.Errorf(“Unable to write event ‘%#v’ (retry limit exceeded!)”, event)


break


}


// 第一次重试增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件


if tries == 1 {


time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))


} else {


time.Sleep(sleepDuration)


}


}StartRecordingToSink() 方法先根据当前时间生成一个随机数发生器 randGen,增加随机数是为了在重试时增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件,接着实例化一个 EventCorrelator,EventCorrelator 会对事件做一些预处理的工作,其中包括过滤、聚合、缓存等操作,具体代码不做详细分析,最后将 recordToSink() 函数作为处理函数,recordToSink() 会将处理后的 events 发送到 apiserver,这是 StartEventWatcher() 的整个工作流程。


06


Events 简单实现


了解完 events 的整个处理流程后,可以参考其实现方式写一个 demo,要实现一个完整的 events 需要包含以下几个功能:


  1. 事件的产生

  2. 事件的发送

  3. 事件广播

  4. 事件缓存

  5. 事件过滤和聚合







此处仅简单实现,将 EventRecorder 处理 events 的功能直接放在了 EventBroadcaster 中实现,对 events 的处理方法仅实现了 StartLogging(),Broadcaster 中的部分功能是直接复制 K8s 中的代码,有一定的精简,其实现值得学习,此处对 EventCorrelator 并没有进行实现。


代码请参考:


https://github.com/gosoon/k8s-learning-notes/tree/master/k8s-package/events


07


总结


本文讲述了 K8s 中 events 从产生到展示的一个完整过程,最后也实现了一个简单的 demo,在此将 kubelet 对 events 的整个处理过程再梳理下,其中主要有三个对象 EventBroadcaster、EventRecorder、Broadcaster:


  • kubelet 首先会初始化 EventBroadcaster 对象,同时会初始化一个 Broadcaster 对象。

  • kubelet 通过 EventBroadcaster 对象的 NewRecorder() 方法初始化 EventRecorder 对象,EventRecorder 对象提供的几个方法会生成 events 并通过 Action() 方法发送 events 到 Broadcaster 的 channel 队列中。

  • Broadcaster 的作用就是接收所有的 events 并进行广播,Broadcaster 初始化后会在后台启动一个 goroutine,然后接收所有从 EventRecorder 发来的 events。

  • EventBroadcaster 对 events 有三个处理方法:

  • StartEventWatcher()

  • StartRecordingToSink()

  • StartLogging(),StartEventWatcher() 是其中的核心方法,会初始化一个 watcher 注册到 Broadcaster,其余两个处理函数对 StartEventWatcher() 进行了封装,并实现了自己的处理函数。

  • Broadcaster 中有一个 map 会保存每一个注册的 watcher,其会将所有的 events 广播给每一个 watcher,每个 watcher 通过它的 ResultChan() 方法从 channel 接收 events。

  • kubelet 会使用 StartRecordingToSink() 和 StartLogging() 对 events 进行处理,StartRecordingToSink() 处理函数收到 events 后会进行缓存、过滤、聚合而后发送到 apiserver,apiserver 会将 events 保存到 etcd 中,使用 kubectl 或其他客户端可以查看。StartLogging() 仅将 events 保存到 kubelet 的日志中。


第四课:KubeEdge 设备管理设计原理


晚 8:00 直播


识别下图二维码,加群获取课程材料



2019-12-31 16:462357

评论

发布
暂无评论
发现更多内容

再获殊荣 | 澳鹏Appen获评2024全球数据标注领域Top领军者

澳鹏Appen

数据标注

C++容器算法

EquatorCoco

c++ 容器 算法

流程挖掘价值实现的加速器!望繁信科技全链路解决方案惊艳刷屏

望繁信科技

数字化转型 流程挖掘 流程资产 流程智能 望繁信科技

一种PyInstaller中优雅的控制包大小的方法

不在线第一只蜗牛

Java JavaScript 数据库

人工智能赋能经管学科创新发展,和鲸助力同济大学打造教研训一体化实践

ModelWhale

人工智能 交叉学科 同济大学 经济与管理 同济大学MBA

Java元注解介绍

不在线第一只蜗牛

Java

SpringBoot多环境日志配置

快乐非自愿限量之名

Java Spring Boot 后端

Proxyless的多活流量和微服务治理

京东科技开发者

电报tg游戏游戏开发合成小游戏设计定制

V\TG【ch3nguang】

汽车乘客热舒适度大挑战,如何利用仿真技术提高汽车环境舒适度

Altair RapidMiner

人工智能 汽车 仿真 智能制造 altair

聚焦 AI 加持下泛娱乐场景的技术革新|RTE Plus 声网城市沙龙杭州站

声网

2025杭州国际智能建筑展览会

AIOTE智博会

智能楼宇展 智能楼宇展会 智能楼宇展览会 智能楼宇博览会

视频审核架构实践

京东科技开发者

清华大学ChatGLM大模型

霍格沃兹测试开发学社

今日分享丨微服务架构下查询数据缓存策略

inBuilder低代码平台

微服务 数据缓存

从零开始带你玩转 AI 变现公开课

霍格沃兹测试开发学社

京东商品信息快速获取:API返回值实战教程

技术冰糖葫芦

API Explorer API 测试 API 策略 pinduoduo API

开发体育直播平台:如何全面防护,抵御网络攻击?

软件开发-梦幻运营部

在 Go 中如何让结构体不可比较?

秃头小帅oi

openGauss- 智能基数估计

Gauss松鼠会

opengauss

京东平台内容合规的技术与挑战

京东科技开发者

淘宝商品详情API返回值中的商品材质与成分

技术冰糖葫芦

API Explorer API 测试 API 策略 pinduoduo API

JS脚本批量处理TS数据类型

快乐非自愿限量之名

JavaScript js 脚本

通过观测云 eBPF Tracing 实现无埋点的全链路追踪

观测云

ebpf

Kubernetes 中的事件处理机制_服务革新_InfoQ精选文章