QCon 演讲火热征集中,快来分享技术实践与洞见! 了解详情
写点什么

Kubernetes 中的事件处理机制

tianfeiyu

  • 2019-12-31
  • 本文字数:5559 字

    阅读完需:约 18 分钟

Kubernetes 中的事件处理机制

前言

当集群中的 node 或 pod 异常时,大部分用户会使用 kubectl 查看对应的 events,那么 events 是从何而来?


其实 K8s 中的各个组件会将运行时产生的各种事件汇报到 apiserver,对于 K8s 中的可描述资源,使用 kubectl describe 都可以看到其相关的 events,那 K8s 中又有哪几个组件都上报 events 呢?


只要在 k8s.io/kubernetes/cmd 目录下暴力搜索一下就能知道哪些组件会产生 events:



可以看出,controller-manage、kube-proxy、kube-scheduler、kubelet 都使用了 EventRecorder,本文只讲述 kubelet 中对 Events 的使用。


01


Events 的定义


events 在 k8s.io/api/core/v1/types.go 中进行定义,结构体如下所示:



image.png


其中 InvolvedObject 代表和事件关联的对象,source 代表事件源,使用 kubectl 看到的事件一般包含 Type、Reason、Age、From、Message 几个字段。


K8s 中 events 目前只有两种类型:“Normal” 和 “Warning”:



events 的两种类型


02


EventBroadcaster 的初始化


events 的整个生命周期都与 EventBroadcaster 有关,kubelet 中对 EventBroadcaster 的初始化在 k8s.io/kubernetes/cmd/kubelet/app/server.go 中:


func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {



// event 初始化


makeEventRecorder(kubeDeps, nodeName)



}


func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {


if kubeDeps.Recorder != nil { return }


// 初始化 EventBroadcaster


eventBroadcaster := record.NewBroadcaster()


// 初始化 EventRecorder


kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})


// 记录 events 到本地日志


eventBroadcaster.StartLogging(glog.V(3).Infof)


if kubeDeps.EventClient != nil {


glog.V(4).Infof(“Sending events to api server.”)


// 上报 events 到 apiserver


eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})


} else {


glog.Warning(“No api server defined - no events will be sent to API server.”)


}


}Kubelet 在启动的时候会初始化一个 EventBroadcaster,它主要是对接收到的 events 做一些后续的处理(保存、上报等),EventBroadcaster 也会被 kubelet 中的其他模块使用,以下是相关的定义,对 events 生成和处理的函数都定义在


k8s.io/client-go/tools/record/event.go 中:



EventBroadcaster 是个接口类型,该接口有以下四个方法:


  • StartEventWatcher():EventBroadcaster 中的核心方法,接收各模块产生的 events,参数为一个处理 events 的函数,用户可以使用 StartEventWatcher() 接收 events 然后使用自定义的 handle 进行处理

  • StartRecordingToSink():调用

  • StartEventWatcher() :接收 events,并将收到的 events 发送到 apiserver

  • StartLogging():也是调用 StartEventWatcher() 接收 events,然后保存 events 到日志

  • NewRecorder():会创建一个指定 EventSource 的 EventRecorder,EventSource 指明了哪个节点的哪个组件


eventBroadcasterImpl 是 eventBroadcaster 实际的对象,初始化 EventBroadcaster 对象的时候会初始化一个 Broadcaster,Broadcaster 会启动一个 goroutine 接收各组件产生的 events 并广播到每一个 watcher。


func NewBroadcaster() EventBroadcaster {


return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}


}


可以看到,kubelet 在初始化完 EventBroadcaster 后会调用 StartRecordingToSink() 和 StartLogging() 两个方法,StartRecordingToSink() 处理函数会将收到的 events 进行缓存、过滤、聚合而后发送到 apiserver,StartLogging() 仅将 events 保存到 kubelet 的日志中。


03


Events 的生成


从初始化 EventBroadcaster 的代码中可以看到 kubelet 在初始化完 EventBroadcaster 后紧接着初始化了 EventRecorder,并将已经初始化的 Broadcaster 对象作为参数传给了 EventRecorder,至此,EventBroadcaster、EventRecorder、Broadcaster 三个对象产生了关联。EventRecorder 的主要功能是生成指定格式的 events,以下是相关的定义:


type recorderImpl struct {


scheme *runtime.Scheme


source v1.EventSource


*watch.Broadcaster


clock clock.Clock


}


type EventRecorder interface {


Event(object runtime.Object, eventtype, reason, message string) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args …interface{})


PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args …interface{})


AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args …interface{})


}EventRecorder 中包含的几个方法都是产生指定格式的 events,Event() 和 Eventf() 的功能类似 fmt.Println() 和 fmt.Printf(),kubelet 中的各个模块会调用 EventRecorder 生成 events。recorderImpl 是 EventRecorder 实际的对象。EventRecorder 的每个方法会调用 generateEvent,在 generateEvent 中初始化 events 。


以下是生成 events 的函数:


func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {


ref, err := ref.GetReference(recorder.scheme, object)


if err != nil {


glog.Errorf(“Could not construct reference to: ‘%#v’ due to: ‘%v’. Will not report event: ‘%v’ ‘%v’ ‘%v’”, object, err, eventtype, reason, message)


return }


if !validateEventType(eventtype) {


glog.Errorf(“Unsupported event type: ‘%v’”, eventtype) return }


event := recorder.makeEvent(ref, annotations, eventtype, reason, message)


event.Source = recorder.source go func() {


// NOTE: events should be a non-blocking operation


defer utilruntime.HandleCrash() // 发送事件 recorder.Action(watch.Added, event)


}()


}func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {


t := metav1.Time{Time: recorder.clock.Now()}


namespace := ref.Namespace


if namespace == “” {


namespace = metav1.NamespaceDefault


}


return &v1.Event{


ObjectMeta: metav1.ObjectMeta{


Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),


Namespace: namespace,


Annotations: annotations,


},


InvolvedObject: *ref,


Reason: reason,


Message: message,


FirstTimestamp: t,


LastTimestamp: t,


Count: 1,


Type: eventtype,初始化完 events 后会调用 recorder.Action() 将 events 发送到 Broadcaster 的事件接收队列中, Action() 是 Broadcaster 中的方法。


以下是 Action() 方法的实现:



04


Events 的广播


上面已经说了,EventBroadcaster 初始化时会初始化一个 Broadcaster,Broadcaster 的作用就是接收所有的 events 并进行广播,Broadcaster 的实现在 k8s.io/apimachinery/pkg/watch/mux.go 中,Broadcaster 初始化完成后会在后台启动一个 goroutine,然后接收所有从 EventRecorder 发送过来的 events,Broadcaster 中有一个 map 会保存每一个注册的 watcher, 接着将 events 广播给所有的 watcher,每个 watcher 都有一个接收消息的 channel,watcher 可以通过它的 ResultChan() 方法从 channel 中读取数据进行消费。


以下是 Broadcaster 广播 events 的实现:



05


Events 的处理


那么 watcher 是从何而来呢?每一个要处理 events 的 client 都需要初始化一个 watcher,处理 events 的方法是在 EventBroadcaster 中定义的,以下是 EventBroadcaster 中对 events 处理的三个函数:


func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {


watcher := eventBroadcaster.Watch()


go func() {


defer utilruntime.HandleCrash()


for watchEvent := range watcher.ResultChan() {


event, ok := watchEvent.Object.(*v1.Event)


if !ok {


// This is all local, so there’s no reason this should


// ever happen.


continue


}


eventHandler(event)


}


}()


return watcher


StartEventWatcher() 首先实例化一个 watcher,每个 watcher 都会被塞入到 Broadcaster 的 watcher 列表中,watcher 从 Broadcaster 提供的 channel 中读取 events,然后再调用 eventHandler 进行处理,StartLogging() 和 StartRecordingToSink() 都是对 StartEventWatcher() 的封装,都会传入自己的处理函数。


func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args …interface{})) watch.Interface {


return eventBroadcaster.StartEventWatcher(


func(e *v1.Event) {


logf(“Event(%#v): type: ‘%v’ reason: ‘%v’ %v”, e.InvolvedObject, e.Type, e.Reason, e.Message)


})


StartLogging() 传入的 eventHandler 仅将 events 保存到日志中。


func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {


// The default math/rand package functions aren’t thread safe, so create a


// new Rand object for each StartRecording call.


randGen := rand.New(rand.NewSource(time.Now().UnixNano()))


eventCorrelator := NewEventCorrelator(clock.RealClock{})


return eventBroadcaster.StartEventWatcher(


func(event *v1.Event) {


recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)


})


}


func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, randGen *rand.Rand, sleepDuration time.Duration) {


eventCopy := *event


event = &eventCopy


result, err := eventCorrelator.EventCorrelate(event)


if err != nil {


utilruntime.HandleError(err)


}


if result.Skip { return }


tries := 0


for {


if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {


break


}


tries++


if tries >= maxTriesPerEvent {


glog.Errorf(“Unable to write event ‘%#v’ (retry limit exceeded!)”, event)


break


}


// 第一次重试增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件


if tries == 1 {


time.Sleep(time.Duration(float64(sleepDuration) * randGen.Float64()))


} else {


time.Sleep(sleepDuration)


}


}StartRecordingToSink() 方法先根据当前时间生成一个随机数发生器 randGen,增加随机数是为了在重试时增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件,接着实例化一个 EventCorrelator,EventCorrelator 会对事件做一些预处理的工作,其中包括过滤、聚合、缓存等操作,具体代码不做详细分析,最后将 recordToSink() 函数作为处理函数,recordToSink() 会将处理后的 events 发送到 apiserver,这是 StartEventWatcher() 的整个工作流程。


06


Events 简单实现


了解完 events 的整个处理流程后,可以参考其实现方式写一个 demo,要实现一个完整的 events 需要包含以下几个功能:


  1. 事件的产生

  2. 事件的发送

  3. 事件广播

  4. 事件缓存

  5. 事件过滤和聚合







此处仅简单实现,将 EventRecorder 处理 events 的功能直接放在了 EventBroadcaster 中实现,对 events 的处理方法仅实现了 StartLogging(),Broadcaster 中的部分功能是直接复制 K8s 中的代码,有一定的精简,其实现值得学习,此处对 EventCorrelator 并没有进行实现。


代码请参考:


https://github.com/gosoon/k8s-learning-notes/tree/master/k8s-package/events


07


总结


本文讲述了 K8s 中 events 从产生到展示的一个完整过程,最后也实现了一个简单的 demo,在此将 kubelet 对 events 的整个处理过程再梳理下,其中主要有三个对象 EventBroadcaster、EventRecorder、Broadcaster:


  • kubelet 首先会初始化 EventBroadcaster 对象,同时会初始化一个 Broadcaster 对象。

  • kubelet 通过 EventBroadcaster 对象的 NewRecorder() 方法初始化 EventRecorder 对象,EventRecorder 对象提供的几个方法会生成 events 并通过 Action() 方法发送 events 到 Broadcaster 的 channel 队列中。

  • Broadcaster 的作用就是接收所有的 events 并进行广播,Broadcaster 初始化后会在后台启动一个 goroutine,然后接收所有从 EventRecorder 发来的 events。

  • EventBroadcaster 对 events 有三个处理方法:

  • StartEventWatcher()

  • StartRecordingToSink()

  • StartLogging(),StartEventWatcher() 是其中的核心方法,会初始化一个 watcher 注册到 Broadcaster,其余两个处理函数对 StartEventWatcher() 进行了封装,并实现了自己的处理函数。

  • Broadcaster 中有一个 map 会保存每一个注册的 watcher,其会将所有的 events 广播给每一个 watcher,每个 watcher 通过它的 ResultChan() 方法从 channel 接收 events。

  • kubelet 会使用 StartRecordingToSink() 和 StartLogging() 对 events 进行处理,StartRecordingToSink() 处理函数收到 events 后会进行缓存、过滤、聚合而后发送到 apiserver,apiserver 会将 events 保存到 etcd 中,使用 kubectl 或其他客户端可以查看。StartLogging() 仅将 events 保存到 kubelet 的日志中。


第四课:KubeEdge 设备管理设计原理


晚 8:00 直播


识别下图二维码,加群获取课程材料



2019-12-31 16:462371

评论

发布
暂无评论
发现更多内容

架构实战训练营模块一

人生就是梦

架构实战营

HMS的舞者们,在智能世界的舞台

脑极体

学生管理系统架构设计

天天向上

架构实战营

什么是架构

天天向上

架构实战营

电商微服务拆分

白开水又一杯

#架构实战营

软件架构设计原则之依赖倒置原则

Tom弹架构

Java 架构 设计模式 设计原则

软件架构设计原则之单一职责原则

Tom弹架构

Java 架构 设计模式 设计原则

拆分电商系统为微服务

缘分呐

微服务 电商系统

架构实战营 拆分电商系统为微服务

💤 ZZzz💤

架构实战营

模块一作业

zjluoyue

ZK(ZooKeeper)分布式锁实现

牧小农

zookeeper ZooKeeper原理 zookeeper分布式锁

架构实战营-第三期-模块一作业

岚哲

极客时间 架构 架构实战营

Spring版本命名规则

Tom弹架构

Java spring 架构

软件架构设计原则之开闭原则

Tom弹架构

Java 架构 设计模式 设计原则

10分钟搞懂事件驱动API

俞凡

架构 API

模块一作业

hhh

「架构实战营」

学生管理系统

Mars

架构实战营 模块一

web安全:mysql提权总结篇

网络安全学海

黑客 网络安全 信息安全 渗透测试 WEB安全

电商系统微服务拆分

michael

架构实战营

模块六作业

potti

指标统计:基于流计算 Oceanus(Flink) 实现实时 UVPV 统计

腾讯云大数据

大数据 流计算 Oceanus

软件架构设计原则之合成复用原则

Tom弹架构

Java 架构 设计模式 设计原则

软件架构设计原则之接口隔离原则

Tom弹架构

Java 架构 设计模式 设计原则

架构实战营总结

gawaine

架构实战营

软件架构设计原则之迪米特法则

Tom弹架构

Java 架构 设计模式 设计原则

这样学BAT必面之软件设计原则,还不会就是我的问题

Tom弹架构

Java 架构 面试 设计模式 设计原则

模块6作业

4anonymous

【架构实战营作业】模块六——创业公司电商微服务架构

聆息

随便谈一下kafka消息队列

Regan Yue

kafka 10月月更

软件架构设计原则之里氏替换原则

Tom弹架构

Java 架构 设计模式 设计原则

Vue进阶(幺肆玖):template 标签

No Silver Bullet

Vue 模板 占位符 10月月更

Kubernetes 中的事件处理机制_服务革新_InfoQ精选文章