写点什么

Kubernetes 中定时任务的实现

  • 2020-01-10
  • 本文字数:2073 字

    阅读完需:约 7 分钟

Kubernetes 中定时任务的实现

Kubernetes 中定时任务的实现

K8s 中有许多优秀的包都可以在平时的开发中借鉴与使用,比如,任务的定时轮询、高可用的实现、日志处理、缓存使用等都是独立的包,可以直接引用。


本文将介绍 K8s 中定时任务的实现,K8s 中定时任务都是通过 wait 包实现的。wait 包在 K8s 的多个组件中都有用到,以下是 wait 包在 kubelet 中的几处使用:


func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {



// kubelet 每 5 分钟一次从 apiserver 获取证书


closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute)


if err != nil {


return err


}


closeAllConns, err := kubeletcertificate.UpdateTransport(wait.NeverStop, clientConfig, clientCertificateManager, 5*time.Minute)


if err != nil {


return err


}



}



func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {


// 持续监听 pod 的变化


go wait.Until(func() {


k.Run(podCfg.Updates())


}, 0, wait.NeverStop)



}


golang 中可以通过 time.Ticker 实现定时任务的执行,但在 K8s 中用了更原生的方式,使用 time.Timer 实现的。time.Ticker 和 time.Timer 的使用区别如下:


  • ticker 只要定义完成,从此刻开始计时,不需要任何其他的操作,每隔固定时间都会自动触发。

  • timer 定时器是到了固定时间后会执行一次,仅执行一次

  • 如果 timer 定时器要每隔间隔的时间执行,实现 ticker 的效果,使用 func (t *Timer) Reset(d Duration) bool


一个示例:


package main


import (


“fmt”


“sync”


“time”


)func main() {


var wg sync.WaitGroup timer1 := time.NewTimer(2 * time.Second)


ticker1 := time.NewTicker(2 * time.Second) wg.Add(1)


go func(t *time.Ticker) {


defer wg.Done()


for {


<-t.C


fmt.Println(“exec ticker”, time.Now().Format(“2006-01-02 15:04:05”))


}


}(ticker1) wg.Add(1)


go func(t *time.Timer) {


defer wg.Done()


for {


<-t.C


fmt.Println(“exec timer”, time.Now().Format(“2006-01-02 15:04:05”))


t.Reset(2 * time.Second)


}


}(timer1) wg.Wait()


}01


wait 包中的核心代码


核心代码


k8s.io/apimachinery/pkg/util/wait/wait.go:


func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {


var t *time.Timer


var sawTimeout bool


for {


select {


case <-stopCh:


return


default:


} jitteredPeriod := period


if jitterFactor > 0.0 {


jitteredPeriod = Jitter(period, jitterFactor)


} if !sliding {


t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)


} func() {


defer runtime.HandleCrash()


f()


}() if sliding {


t = resetOrReuseTimer(t, jitteredPeriod, sawTimeout)


} select {


case <-stopCh:


return


case <-t.C:


sawTimeout = true


}


}


}…


func resetOrReuseTimer(t *time.Timer, d time.Duration, sawTimeout bool) *time.Timer {


if t == nil {


return time.NewTimer(d)


}


if !t.Stop() && !sawTimeout {


<-t.C


}


t.Reset(d)


return t


}几个关键点的说明:


  1. 如果 sliding 为 true,则在 f() 运行之后计算周期。如果为 false,那么 period 包含 f() 的执行时间。

  2. 在 golang 中 select 没有优先级选择,为了避免额外执行 f(),在每次循环开始后会先判断 stopCh chan。


K8s 中 wait 包其实是对 time.Timer 做了一层封装实现。


02


wait 包常用的方法


定期执行一个函数,永不停止,可以使用 Forever 方法:


func Forever(f func(), period time.Duration)


在需要的时候停止循环,那么可以使用下面的方法,增加一个用于停止的 chan 即可,方法定义如下:


func Until(f func(), period time.Duration, stopCh <-chan struct{})


上面的第三个参数 stopCh 就是用于退出无限循环的标志,停止的时候我们 close 掉这个 chan 就可以了。


有时候,我们还会需要在运行前去检查先决条件,在条件满足的时候才去运行某一任务,这时候可以使用 Poll 方法:


func Poll(interval, timeout time.Duration, condition ConditionFunc)


这个函数会以 interval 为间隔,不断去检查 condition 条件是否为真,如果为真则可以继续后续处理;如果指定了 timeout 参数,则该函数也可以只常识指定的时间。


PollUntil 方法和上面的类似,但是没有 timeout 参数,多了一个 stopCh 参数,如下所示:


PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error


此外还有 PollImmediate 、 PollInfinite 和 PollImmediateInfinite 方法。


03


总结


本文主要讲了 K8s 中定时任务的实现与对应包(wait)中方法的使用。


通过阅读 K8s 的源代码,可以发现 K8s 中许多功能的实现也都是我们需要在平时工作中用的,其大部分包的性能都是经过大规模考验的,通过使用其相关的工具包不仅能学到大量的编程技巧也能避免自己造轮子。


原文:https://dwz.cn/5zxd9ZqU


2020-01-10 13:531367

评论

发布
暂无评论
发现更多内容

赞!一篇博客讲解清楚 Python queue模块,作为Python爬虫预备知识,用它解决采集队列问题

梦想橡皮擦

11月日更

Github webhooks 自动部署博客文章,使用总结【含视频】

小傅哥

GitHub 小傅哥 WEBHOOKS 自动部署 通知回调

在华为云专属月,找到开启互联网第二增长曲线的一把钥匙

脑极体

Android C++系列:JNI操作Bitmap

轻口味

c++ android jni 11月日更

一文告诉你 K8s PR (Pull Request) 怎样才能被 merge?

腾源会

k8s

进击的Java(七)

ES_her0

11月日更

消息队列表设计

Rabbit

面试官:讲讲雪花算法,越详细越好

秦怀杂货店

分布式 雪花算法

Prometeus 2.31.0 新特性

耳东@Erdong

release Prometheus 11月日更

【高并发】通过源码深度解析ThreadPoolExecutor类是如何保证线程池正确运行的

冰河

Java 并发编程 多线程 高并发 异步编程

npm必知必会点

废材壶

大前端 npm Node

【LeetCode】反转链表Java题解

Albert

算法 LeetCode 11月日更

Ubuntu系统下《汇编语言》环境配置

codists

汇编语言

数据库连接池Demo(1)单线程初步

Java 数据库 连接池

腾讯发布 K8s 多集群管理开源项目 Clusternet

腾源会

开源 K8s 多集群管理 Clusternet

CNCF 沙箱再添“新将”!云原生边缘容器开源项目 SuperEdge 正式入选

腾源会

开源 容器 云原生 cncf

模块八作业:设计消息队列存储消息数据的 MySQL 表格

apple

Golang Gin 框架入门介绍(二)

liuzhen007

11月日更

多模态内容理解算法框架项目 Lichee 正式开源,为微服务开源社区贡献力量

腾源会

开源

我在 IBM 从事开源工作的十一年

腾源会

开源

[ CloudWeGo 微服务实践 - 08 ] Nacos 服务发现扩展 (2)

baiyutang

golang 微服务 11月日更

干货分享:细说双 11 直播背后的压测保障技术

阿里巴巴云原生

阿里云 云原生 性能测试 PTS

如何评价一个开源项目(一)--活跃度

腾源会

开源

SuperEdge 和 FabEdge 联合在边缘 K8s 集群支持原生 Service 云边互访和 PodIP 直通

腾源会

开源 边缘计算 superedge

腾讯自研分布式远程Shuffle服务Firestorm正式开源

腾源会

大数据 开源 腾讯

架构训练营 模块三 作业

dog_brother

「架构实战营」

腾讯开源全景图再刷新:社区贡献领跑国内企业,获超过38万开发者关注

腾源会

开源 腾讯

flutter小部件知多少?

坚果

flutter 11月日更

怎么清空.NET数据库连接池

喵叔

11月日更

腾讯云原生开源生态专场召开,洞察开源云原生技术发展趋势和商业化路径

腾源会

腾讯云 开源 云原生

Serverless 架构模式及演进

阿里巴巴云原生

阿里云 Serverless 云原生 架构模式

Kubernetes 中定时任务的实现_语言 & 开发_华为云原生团队_InfoQ精选文章