写点什么

详解 Kubernetes Job 和 CronJob 的实现原理

  • 2019-12-03
  • 本文字数:6274 字

    阅读完需:约 21 分钟

详解 Kubernetes Job 和 CronJob 的实现原理

之前介绍了 Kubernetes 中用于长期提供服务的 ReplicaSetDeploymentStatefulSetDaemonSet 等资源,但是作为一个容器编排引擎,任务和定时任务的支持是一个必须要支持的功能。


Kubernetes 中使用 Job 和 CronJob 两个资源分别提供了一次性任务和定时任务的特性,这两种对象也使用控制器模型来实现资源的管理,我们在这篇文章种就会介绍它们的实现原理。

概述

Kubernetes 中的 Job 可以创建并且保证一定数量 Pod 的成功停止,当 Job 持有的一个 Pod 对象成功完成任务之后,Job 就会记录这一次 Pod 的成功运行;当一定数量的 Pod 的任务执行结束之后,当前的 Job 就会将它自己的状态标记成结束。



上述图片中展示了一个 spec.completions=3 的任务的状态随着 Pod 的成功执行而更新和迁移状态的过程,从图中我们能比较清楚的看到 Job 和 Pod 之间的关系,假设我们有一个用于计算圆周率的如下任务:


YAML


apiVersion: batch/v1kind: Jobmetadata:  name: pispec:  completions: 10  parallelism: 5  template:    spec:      containers:      - name: pi        image: perl        command: ["perl",  "-Mbignum=bpi", "-wle", "print bpi(2000)"]      restartPolicy: Never  backoffLimit: 4
复制代码


当我们在 Kubernetes 中创建上述任务时,使用 kubectl 能够观测到以下的信息:


Bash


$ k apply -f job.yamljob.batch/pi created
$ kubectl get job --watchNAME COMPLETIONS DURATION AGEpi 0/10 1s 1spi 1/10 36s 36spi 2/10 46s 46spi 3/10 54s 54spi 4/10 60s 60spi 5/10 65s 65spi 6/10 90s 90spi 7/10 99s 99spi 8/10 104s 104spi 9/10 107s 107spi 10/10 109s 109s
复制代码


由于任务 pi 在配置时指定了 spec.completions=10,所以当前的任务需要等待 10 个 Pod 的成功执行,另一个比较重要的 spec.parallelism=5 表示最多有多少个并发执行的任务,如果 spec.parallelism=1 那么所有的任务都会依次顺序执行,只有前一个任务执行成功时,后一个任务才会开始工作。



每一个 Job 对象都会持有一个或者多个 Pod,而每一个 CronJob 就会持有多个 Job 对象,CronJob 能够按照时间对任务进行调度,它与 crontab 非常相似,我们可以使用 Cron 格式快速指定任务的调度时间:


YAML


apiVersion: batch/v1beta1kind: CronJobmetadata:  name: pispec:  schedule: "*/1 * * * *"  jobTemplate:    spec:      completions: 2      parallelism: 1      template:        spec:          containers:          - name: pi            image: perl            command: ["perl",  "-Mbignum=bpi", "-wle", "print bpi(2000)"]          restartPolicy: OnFailure
复制代码


上述的 CronJob 对象被创建之后,每分钟都会创建一个新的 Job 对象,所有的 CronJob 创建的任务都会带有调度时的时间戳,例如:pi-1551660600 和 pi-1551660660 两个任务:


Bash


$ k get cronjob --watchNAME   SCHEDULE      SUSPEND   ACTIVE   LAST SCHEDULE   AGEpi    */1 * * * *    False     0        <none>          3spi    */1 * * * *    False     1        1s              7s
$ k get job --watchNAME COMPLETIONS DURATION AGEpi-1551660600 0/3 0s 0spi-1551660600 1/3 16s 16spi-1551660600 2/3 31s 31spi-1551660600 3/3 44s 44spi-1551660660 0/3 1spi-1551660660 0/3 1s 1spi-1551660660 1/3 14s 14spi-1551660660 2/3 28s 28spi-1551660660 3/3 42s 43s
复制代码


CronJob 中保存的任务其实是有上限的,spec.successfulJobsHistoryLimitspec.failedJobsHistoryLimit 分别记录了能够保存的成功或者失败的任务上限,超过这个上限的任务都会被删除,默认情况下这两个属性分别为 spec.successfulJobsHistoryLimit=3spec.failedJobsHistoryLimit=1

任务

Job 遵循 Kubernetes 的控制器模式进行设计,在发生需要监听的事件时,Informer 就会调用控制器中的回调将需要处理的资源 Job 加入队列,而控制器持有的工作协程就会处理这些任务。



用于处理 Job 资源的 JobController 控制器会监听 Pod 和 Job 这两个资源的变更事件,而资源的同步还是需要运行 syncJob 方法。

同步

syncJobJobController 中主要用于同步 Job 资源的方法,这个方法的主要作用就是对资源进行同步,它的大体架构就是先获取一些当前同步的基本信息,然后调用 manageJob 方法管理 Job 对应的 Pod 对象,最后计算出处于 activefailedsucceed 三种状态的 Pod 数量并更新 Job 的状态:


Go


func (jm *JobController) syncJob(key string) (bool, error) {  ns, name, _ := cache.SplitMetaNamespaceKey(key)  sharedJob, _ := jm.jobLister.Jobs(ns).Get(name)  job := *sharedJob
jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
pods, _ := jm.getPodsForJob(&job)
activePods := controller.FilterActivePods(pods) active := int32(len(activePods)) succeeded, failed := getStatus(pods) conditions := len(job.Status.Conditions)
复制代码


这一部分代码会从 apiserver 中该名字对应的 Job 对象,然后获取 Job 对应的 Pod 数组并根据计算不同状态 Pod 的数量,为之后状态的更新和比对做准备。


Go


var manageJobErr error  if jobNeedsSync && job.DeletionTimestamp == nil {    active, manageJobErr = jm.manageJob(activePods, succeeded, &job)  }
复制代码


准备工作完成之后,会调用 manageJob 方法,该方法主要负责管理 Job 持有的一系列 Pod 的运行,它最终会返回目前集群中当前 Job 对应的活跃任务的数量。


Go


completions := succeeded  complete := false  if job.Spec.Completions == nil {    if succeeded > 0 && active == 0 {      complete = true    }  } else {    if completions >= *job.Spec.Completions {      complete = true    }  }  if complete {    job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))    now := metav1.Now()    job.Status.CompletionTime = &now  }
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions { job.Status.Active = active job.Status.Succeeded = succeeded job.Status.Failed = failed
jm.updateHandler(&job) }
return forget, manageJobErr}
复制代码


最后的这段代码会将 Job 规格中设置的 spec.completions 和已经完成的任务数量进行比对,确认当前的 Job 是否已经结束运行,如果任务已经结束运行就会更新当前 Job 的完成时间,同时当 JobController 发现有一些状态没有正确同步时,也会调用 updateHandler 更新资源的状态。

并行执行

Pod 的创建和删除都是由 manageJob 这个方法负责的,这个方法根据 Job 的 spec.parallelism 配置对目前集群中的节点进行创建和删除。



如果当前正在执行的活跃节点数量超过了 spec.parallelism,那么就会按照创建的升删除多余的任务,删除任务时会使用 DeletePod 方法:


Go


func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {  active := int32(len(activePods))  parallelism := *job.Spec.Parallelism
if active > parallelism { diff := active - parallelism sort.Sort(controller.ActivePods(activePods))
active -= diff wait := sync.WaitGroup{} wait.Add(int(diff)) for i := int32(0); i < diff; i++ { go func(ix int32) { defer wait.Done() jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job) }(i) } wait.Wait()
复制代码


当正在活跃的节点数量小于 spec.parallelism 时,我们就会根据当前未完成的任务数和并行度计算出最大可以处于活跃的 Pod 个数 wantActive 以及与当前的活跃 Pod 数相差的 diff


Go


} else if active < parallelism {    wantActive := int32(0)    if job.Spec.Completions == nil {      if succeeded > 0 {        wantActive = active      } else {        wantActive = parallelism      }    } else {      wantActive = *job.Spec.Completions - succeeded      if wantActive > parallelism {        wantActive = parallelism      }    }    diff := wantActive - active    if diff < 0 {      diff = 0    }
active += diff
复制代码


在方法的最后就会以批量的方式并行创建 Pod,所有 Pod 的创建都是通过 CreatePodsWithControllerRef 方法在 Goroutine 中执行的。



这里会使用 WaitGroup 等待每个 Batch 中创建的结果返回才会执行下一个 Batch,Batch 的大小是从 1 开始指数增加的,以冷启动的方式避免首次创建的任务过多造成失败:


Go


wait := sync.WaitGroup{}    for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {      wait.Add(int(batchSize))      for i := int32(0); i < batchSize; i++ {        go func() {          defer wait.Done()          jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))        }()      }      wait.Wait()      diff -= batchSize    }  }
return active, nil}
复制代码


通过对 manageJob 的分析,我们其实能够看出这个方法就是根据规格中的配置对 Pod 进行管理,它在较多并行时删除 Pod,较少并行时创建 Pod,也算是一个简单的资源利用和调度机制,代码非常直白并且容易理解,不需要花太大的篇幅。

定时任务

用于管理 CronJob 资源的 CronJobController 虽然也使用了控制器模式,但是它的实现与其他的控制器不太一样,他没有从 Informer 中接受其他消息变动的通知,而是直接访问 apiserver 中的数据:



从 apiserver 中获取了 Job 和 CronJob 的信息之后就会调用 JobControl 向 apiserver 发起请求创建新的 Job 资源,这一个过程都是由 CronJobControllersyncAll 方法驱动的,我们接下来就来介绍这一方法的实现。

同步

syncAll 方法会从 apiserver 中取出所有的 Job 和 CronJob 对象,然后通过 groupJobsByParent 将任务按照 spec.ownerReferences 进行分类并遍历去同步所有的 CronJob:


Go


func (jm *CronJobController) syncAll() {  jl, _ := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{})  js := jl.Items
sjl, _ := jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{}) sjs := sjl.Items
jobsBySj := groupJobsByParent(js)
for _, sj := range sjs { syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder) cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.recorder) }}
复制代码


syncOne 就是用于同步单个 CronJob 对象的方法,这个方法会首先遍历全部的 Job 对象,只保留正在运行的活跃对象并更新 CronJob 的状态:


Go


func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) {  childrenJobs := make(map[types.UID]bool)  for _, j := range js {    childrenJobs[j.ObjectMeta.UID] = true    found := inActiveList(*sj, j.ObjectMeta.UID)    if found && IsJobFinished(&j) {      deleteFromActiveList(sj, j.ObjectMeta.UID)    }  }
for _, j := range sj.Status.Active { if found := childrenJobs[j.UID]; !found { deleteFromActiveList(sj, j.UID) } }
updatedSJ, _ := sjc.UpdateStatus(sj) *sj = *updatedSJ
复制代码


随后的 getRecentUnmetScheduleTimes 方法会根据 CronJob 的调度配置 spec.schedule 和上一次执行任务的时间计算出我们缺失的任务次数。


Go


times, _ := getRecentUnmetScheduleTimes(*sj, now)  if len(times) == 0 {    return  }
scheduledTime := times[len(times)-1] if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 { return } if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent { for _, j := range sj.Status.Active { job, _ := jc.GetJob(j.Namespace, j.Name) if !deleteJob(sj, job, jc, recorder) { return } } }
复制代码


如果现在需要调度新的任务,但是当前已经存在活跃的任务,就会根据并发策略的配置执行不同的操作:


  1. 使用 ForbidConcurrent 策略跳过这一次任务的调度直接返回;

  2. 使用 ReplaceConcurrent 策略获取并删除全部活跃的任务,通过创建新的 Pod 替换这些正在执行的活跃 Pod;


Go


jobReq, _ := getJobFromTemplate(sj, scheduledTime)  jobResp, _ := jc.CreateJob(sj.Namespace, jobReq)
ref,_ := getRef(jobResp) sj.Status.Active = append(sj.Status.Active, *ref) sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime} sjc.UpdateStatus(sj)
return}
复制代码


在方法的最后,它会从 CronJob 的 spec.jobTemplate 中拿到创建 Job 使用的模板并调用 JobControlCreateJob 向 apiserver 发起创建任务的 HTTP 请求,接下来的操作就都是由 JobController 负责了。

总结

Job 作为 Kubernetes 中用于处理任务的资源,与其他的资源没有太多的区别,它也使用 Kubernetes 中常见的控制器模式,监听 Informer 中的事件并运行 syncHandler 同步任务


而 CronJob 由于其功能的特殊性,每隔 10s 会从 apiserver 中取出资源并进行检查是否应该触发调度创建新的资源,需要注意的是 CronJob 并不能保证在准确的目标时间执行,执行会有一定程度的滞后。


两个控制器的实现都比较清晰,只是边界条件比较多,分析其实现原理时一定要多注意。

相关文章

Referenece


**本文转载自 Draveness 技术博客。


原文链接:https://draveness.me/kubernetes-job-cronjob


2019-12-03 15:121348

评论

发布
暂无评论
发现更多内容

电商订单全流程可观测性最佳实践

观测云

[Go WebSocket] 为什么我选用Go重构Python版本的WebSocket服务?

HullQin

Go golang 后端 websocket 8月月更

Java进阶(一)内存解析

No Silver Bullet

Java 9月月更 内存解析

rocksdb和innodb的一些区别

趁早

区块链交易隐私如何保证?华为零知识证明技术实战解析

创意时空

【算法实践】一天路走到黑--手把手带你实现坚持不懈的线性查找

迷彩

Python 数据结构 算法实践 8月月更 线性查找

微服务网关Gateway实践总结

Java 架构

低代码是什么?国内排名前 5 的低代码开发平台对比

蒋川

低代码 开发工具 开发平台

iofod导入任意前端资产,以 Element UI 为例

iofod jude

小程序 前端 低代码 网页

Network源码接口分析

长安链

影视动漫制作为什么要选择云渲染农场?

Finovy Cloud

计算器 云渲染 影视渲染

Go 代码城市上云——KusionStack 实践

SOFAStack

开源

【编程实践】认识爬虫并手把手带手实现新闻网站的爬取

迷彩

记录 Python爬虫 8月月更 网络爬虫

每日一R「21」Unsafe Rust

Samson

学习笔记 8月月更 ​Rust

【JVM】HotspotJVM精通垃圾回收器原理

小明Java问道之路

8月月更

技术解析+代码实战,带你入门华为云政务区块链平台

创意时空

构建万物可信的基石:解密区块链跨链技术

创意时空

IDEA配置tomcat

楠羽

#开源

艺术收藏NFT系统开发:NFT功能搭建

开源直播系统源码

数字藏品 数字藏品系统软件开发 数字藏品开发

基于Vue3常用代码块

青柚1943

typescript Vue3 Element Plus Pinia sortablejs

【JVM】HotspotJVM 分代回收机制

小明Java问道之路

8月月更

C/CPP基础练习题多维数组,矩阵转置,杨辉三角详解

CtrlX

c c++ 基础 8月月更

一起学习设计模式:责任链模式

宇宙之一粟

设计模式 8月月更

金融科技创新者的困境

木风

金融科技 数字化转型 科技创新

自然语言处理--神经网络的复习

IT蜗壳-Tango

自然语言处理 nlp 9月月更

玩转KubeEdge保姆级攻略

乌龟哥哥

8月月更

[教你做小游戏] 展示斗地主扑克牌,支持按出牌规则排序!支持按大小排序!

HullQin

CSS JavaScript html 前端 9月月更

创投基金黑钻资本Black3Lab Capital主投互联网3.0

股市老人

这些智能合约漏洞,可能会影响你的账户安全!

创意时空

阿里云高性能计算负责人何万青:阿里云大计算加速HPC与AI融合

阿里云弹性计算

AI HPC 高性能计算 无影云电脑 计算巢

详解 Kubernetes Job 和 CronJob 的实现原理_文化 & 方法_Draveness_InfoQ精选文章