写点什么

伴鱼分布式调度系统 Jarvis 的设计与实现

  • 2021-07-08
  • 本文字数:10662 字

    阅读完需:约 35 分钟

伴鱼分布式调度系统 Jarvis 的设计与实现

背景


随着伴鱼课程业务需求和用户量的快速增长,涉及到实时和延时任务的场景也越来越多。例如课程录制、课程视频转码、课程视频上传以及相关的课程视频分析、老师学生行为分析、语音识别、情绪识别等算法离线预测任务等。这些任务都需要大量的计算、存储、网络等资源,而且不同的场景对任务的执行时间,调度策略又有不同的要求。如果由业务方来各自管理机器资源并且监控每个任务的状态,累积的维护成本会非常高,而且不方便统一管理。从整体来看,在资源有限的情况下,简单的调度逻辑已无法同时满足全部的任务需求。我们需要一个能进行任务调度、任务编排、异构资源管理、任务监控的分布式任务调度解决方案。但是如何在合理高效管理任务的前提下去做到节约资源成本,就成了我们需要面对的一个问题。


以录制任务为例,为了满足高峰期的需求,有二十多台 64Core 128G Memory 的物理机全天运转提供服务。然而实际上大部分时间机器资源都是闲置的,只有在用户上课时才会有任务执行,但为了用户体验和实时类课程录制,又不能临时减少机器数量。在类似业务的背景下,我们对系统功能要求进行了整理,并对业界开源项目和第三方产品进行了调研。

功能要求


  • 支持实时任务,进行秒级调度。

  • 支持并发场景下批量创建任务。

  • 支持管理异构资源。物理机、K8S 集群、ECI、EKS 等。

  • 支持与运行中任务进行通信。

  • 支持任务状态监控。

  • 支持任务结果回调。

  • 支持任务的失败重试。

  • 支持工作流,编排 DAG。

  • 支持优先级调度,并且保证资源的优先级亲缘性。

  • 支持抢占式调度。紧急任务可直接抢占资源。

  • 保证任务只会被精确调度和执行一次。

  • 支持弹性资源调度。

  • 支持任务资源池。

  • 支持多租户。

  • 支持定时任务。Crontab、Fix delay、Fix rate 等。

  • 支持日志查询。

  • 支持 Shell/Python/Golang 等执行脚本。

  • 支持容器创建,删除,管理等。

产品调研


业界内关于任务调度的开源项目和第三方产品有很多,我们主要调研了其中几个产品,并进行了几项指标的对比。


基于我们自身的需求背景及产品调研结果,主要考虑到如下原因:


  • 开源项目和第三方产品都不能完全满足我们的需求, 比如接入 ECI、EKS 等弹性容器,异构资源管理,支持容器,服务治理,Kubernetes 集群等。

  • 自研更容易适配伴鱼的基础框架和技术工具。

  • 自研系统的架构可灵活调整,并适配业务。

  • 对开源项目做二次开发或者封装第三方 SDK 的开发和维护成本也不低。


因此,我们选择自研一套能够完全满足内部需求的分布式任务调度系统,并取名为 Jarvis (钢铁侠中的智能管家)。在资源上,Jarvis 系统可以接管业务方指定的物理机,云主机,K8S 集群, ECI, EKS 等资源,不同类型的任务可以做到资源隔离和动态管理。除此之外,Jarvis 系统还借助了 ECI 和 EKS 这些弹性容器服务的能力,在物理资源不足时,可以将容器任务调度到上面进行执行。(ECI, EKS:可以理解为一个按使用量计费的,无限容量的 K8S 集群)。

架构设计


Jarvis 系统主要有四大模块:JobManager、Scheduler、ResourceManager、Worker,每个模块都以集群方式部署。

模块介绍

JobManager


负责管理任务的生命周期,维护任务的依赖关系(DAG 编排),支持定时任务,实时任务的创建和管理,监控任务状态,管理任务的生命周期,维护任务状态机。Job Manager 负责监控任务的运行状态、管理任务的生命周期,处理实时/定时/延时任务,另外 Job Manager 还负责监控超时任务,对任务查杀和强行释放资源。

Scheduler


负责对任务进行调度,通过给 ResourceManager 发送任务进行资源绑定,并将分配到资源的任务 dispatch 到指定位置。Jarvis 调度系统的大脑,它从 Job Manager 中获取需要执行的任务,根据任务的类型、等待时间、优先级等信息,按照多种调度算法,对任务进行调度并将任务分发给合理的 Worker 来执行任务。

ResourceManager


负责管理业务方所有可用资源,包括但不限于物理机,云主机,K8S 集群等,并将 Scheduler 推送过来的任务绑定最合适的资源。作为 Jarvis 调度系统的资源管理中心,它还负责将物理机、K8S 集群等资源注册到缓存和数据库,将这些资源统一管理,并监控资源的负载情况和资源使用信息。除此之外,ResourceManager 集成了资源的打分,分配,调度方案,可作为插拔式插件进行更新。

Worker


该模块部署在宿主机上,可以使用容器或二进制形式部署,负责向 ResourceManager 上报机器资源使用情况、健康状态、心跳检查等,并向 Job Manager 上报任务执行状态。最终都会通过 Worker 执行作业与任务。Jarvis 调度系统中的任务执行和分发者,接收并执行由 Scheduler 分发的任务、接收并汇报任务的运行结果。实时向 ResourceManager 回报资源的使用情况、健康状态、心跳等信息,确保物理机资源能够被 Jarvis 管理。

模块细节

JobManager 模块


Job Manager 并不是严格的去中心化设计,而是通过 Etcd 分布式锁选举出 Master 节点,Master 节点相比其它 Slave 节点多了一些全局的监控工作,但不会直接与其它节点存在关联。

任务模型


在 JobManager 中实现了 3 种任务模型:


  • 实时任务:一次创建,一次执行。

  • 定时任务:一次创建,定时执行(周期性执行或指定时间一次性执行)。

  • DAG:根据依赖关系执行。

支持的任务类型


目前我们支持以下任务类型:


  • 容器

  • Golang 脚本

  • Python 脚本

  • Shell 脚本

  • HTTP

  • 自定义任务

  • 其他


对于脚本类的任务,需要提供具体的脚本内容。对于容器类型的任务,需要提供任务镜像,启动参数,环境变量等。因为容器可以方便地限制 CPU、Memory 等资源的使用,而且在 ECI 的助力下,很少会出现资源不足的情况。


Jarvis 系统接入的第一个任务是直播中台的课程录制业务,业务方将原有服务中的录制逻辑抽离出来进行了容器化,在 Jarvis 中以容器形式运行。在接入录制任务的过程中,业务方提出了一些新的需求,比如客户端需要切换录制 SDK,上游服务可调用 Jarvis 的指令发送接口给任务发送切换 SDK 指令,但是 Docker 本身是不支持的,我们最终通过 Docker Exec API + IPC 打通了物理机以及 ECI 上容器任务的通信。另外 Jarvis 还支持自定义 Processor(可以理解为任务插件),可以直接在机器上执行特定的任务。


通过 Job Manager 创建任务的时候,可以设置限制资源(CPU、Memory、GPU 等)的参数,对于容器任务,容器底层自身可以做到严格的资源限制,对于脚本类和自定义 Processor 任务,我们会使用 Linux Cgroup、Namespace 技术来实现资源隔离和限制。

核心接口


  • CreateJob: 创建实时任务。

  • CreateDag: 创建 DAG 任务。

  • CreateCronJob: 创建定时任务。

  • UpdateCronJobExpression: 更新运行中 CornJob 的 cron 表达式。

  • StopCronJob: 暂时停止定时任务,可恢复。

  • KillJob: 强制终止任务。

  • SendCommandToJob: 向运行中的任务发送指令。

任务状态



任一时刻,Job 只会处于以下一种状态


  • Init (初始态):初始化 Job 状态。

  • Schedulable:可被调度状态,Job Manager 收到的。

  • Scheduling:正在调度中状态。

  • Schedulable 的 Job 被 Scheduler 拉取后,JobManager 修改为状态。

  • Pending:待运行的状态。任务已经被 dispatch 到 worker ,但还未开始执行。

  • Running:任务正在运行的状态。Worker 上传给 Job Manager 的 Job 状态。

  • Killed:任务被终止的状态。Worker 上报的被驱逐或抢占的 Job 状态,如重试次数大于 0 改为 Schedulable 状态,否则改为 Failed 状态。

  • Retryable(临时态):任务正在重试的状态。任务执行失败或被 kill 后,如重试次数大于 0 改为 Schedulable 状态,否则改为 Failed 状态。

  • Failed(最终态):任务最终执行失败的 Job 状态。

  • Succeeded(最终态):任务最终执行完成的 Job 状态。


Job 的状态变化主要来自 Scheduler 和 Worker 的上报以及 Job Manager 的监控,同一个 Job 多个状态的上报存在并发问题,可能会造成缓存与数据库的不一致。为此我们实现了支持自动续期的 Redis 分布式锁来保证状态变化的原子性。

任务执行流程


JobManager 收到客户端提交的请求后,通过分布式 ID 生成器生成 JobId ,将其放入 Redis Set 中,并将 Job 信息持久化到 DB。创建 Job 的参数如下:


type CreateJobReq struct {	TeamId         string            //业务方标识	AppName        string            //业务名称	Name           string            //任务名称	Description    string            //任务描述	Creator        string            //创建者	Timezone       string            //时区	Retries        int32             //重试次数	RetryInterval  int32             //重试间隔	Priority       int32             //优先级	Concurrency    bool              //是否并发执行	Executor       string            //http,docker、eci...	ExecutorMode   string            //执行模式	ExecutorConfig map[string]string //任务参数及配置	Cpu            float64           //cpu需求	Memory         int32             //memory需求	Gpu            int32             //gpu需求	Timeout        int32             //超时时间	CallbackUrl    string            //回调URL	...}
复制代码


此时 Job 为 Schedulable 状态,等待被 Scheduler 调度。Scheduler 会定时批量从 Job Manager 维护的 Redis Set 中拉取任务。Job 被拉取后,状态变更为 Scheduling ,然后由 Scheduler 根据 Job 指定的调度策略向 Resource Manager 申请资源,成功申请到资源后,Scheduler 将 Job 指派到对应资源上的 Worker,此时 Job 状态变更为 Pending, 在 Worker 启动任务成功后,任务状态变更为 Running。



定时任务的实现


常见用来实现定时任务的 DelayQueue 和 Cron,底层都是最小堆,单次插入删除的平均时间复杂度是 O(log n), 如果堆的大小已经达到 100w,那么每次插入都需要将近 20 次操作(2^20 = 1048576)。Jarvis 的设计目标是能同时维护百万级的定时任务,在这种情况下,用常见方式去执行创建任务、停止任务等操作将会非常耗时,为此我们需要一种更高级的数据结构:时间轮,而其可以达到近乎 O(1) 的时间复杂度。在海量任务场景下(百万级别),每次插入新的任务,时间轮要比最小堆少 19 次操作。参考 Kafka 中时间轮算法的实现,我们基于 Golang 实现了高性能的层级时间轮并应用到 Job Manager 中。


最简单的时间轮就是一个固定大小的循环列表,其中每格代表一个时间间隔,包含一个双向链表用来维护某一时刻下的任务列表。很明显,这种单层时间轮无法表示较大的时间跨度,且在初始化后无法管理超过跨度的定时任务。层级时间轮通过按需创建多个时间轮,并对每层时间轮设置不同的时间跨度,有效地解决了单层时间轮的缺点。当定时任务超过层级时间轮当前最大时间跨度后,会创建 N 倍与当前跨度的高层时间轮,其中的 N 是上述提到的循环列表格数。随着时间的流逝,高层的时间轮中的任务会被逐步降级插入到下层时间轮中,直到达到最底层时间轮的当前时刻指针,任务被取出,移出时间轮。


对于单个时间轮来说,目前可以达到 ms 级的精度,在 tick = 1ms,时间轮的时间格个数 timewheelsize = 60 时,第一层时间轮的跨度为 60ms,第二层时间轮的跨度为 6060ms = 3.6s,第三层时间轮的跨度为 603.6s = 216s … 第七层时间轮的时间跨度为 88.7 year,仅需七层就足以满足业务上的需求。


在实践过程中,我们也发现了 Jarvis 直接使用时间轮算法的一些问题,例如没有做备份,当服务器宕机时会丢失所有任务。而每个 JobManager 节点中都运行着一个时间轮,我们需要保证这个“分布式时间轮”在服务重启或宕机时,任务能被及时地分发到其它节点。为了解决这个问题,我们在以下的任务监控场景引入了 Job Bucket 的新概念。

任务监控


Job Manager 集群管理着 Jarvis 系统中的所有 Job,为了便于每个节点都均衡地参与到 Job 的监控和管理工作中,上游负责负载均衡,但 Job Manager 还需要先对 Job 进行分配。


对此,我们引入了 Bucket 的概念,类似 Redis Cluster 中的哈希槽。创建 Job 时,会根据 CRC32(JobId) mod BucketCount ,计算出 Job 所映射的 Bucket。Job Manager 节点在启动时会去抢占 Bucket,抢占成功后才会提供监控等服务,否则就会一直尝试抢占。例如,现在有 10 个 bucket,部署了 12 个 Job Manager 节点,这样会有 10 个节点抢到了 Bucket,另外 2 个节点只会提供接口服务,并不实际维护任务。


抢到 Bucket 的节点会监控 Bucket 下绑定的所有 Job,当发现有 Job 在某个状态超时后,会主动执行 kill、释放资源、重新调度等操作。节点需要通过心跳监控对 bucket 续期,当某个节点出现故障时,该 Bucket 会被立即释放,之前未抢到的节点会及时接管该 Bucket。Bucket 的数量是可动态配置的,一般我们会设置节点数为 N。N 代表备用的监控节点,可以自己把控节点数量。除了监控 Job 的状态,Job Manager 中的 Master 节点还会监控 Bucket 的数量,当宕机的节点数超过 N 时,意味着出现了 Bucket 无节点接管的情况。


如果节点宕机,新的节点接管其 Bucket 后,会将该 Bucket 下的定时任务重新加入到自己的时间轮中,这样就保证了定时任务在节点宕机重启时也不会丢失。

DAG 任务编排


DAG 的每个子任务本质都是一个实时任务。在实现上,我们用有向无环 (DAG) 维护了任务间的依赖关系,当子任务执行结束时,通知 DAG 执行一次检查,如果已无可执行的子任务,则 DAG 执行结束。


任务失败策略


一般任务执行失败有以下几种情况:


  • 用户主动 Kill

  • 任务超时后被系统 Kill

  • 任务运行失败


接收到任务失败上报后,Job Manager 会根据创建时指定的 “失败重试次数” 参数,尝试重新调度任务,当重试次数用完后,Job 会被标记为最终态 Failed。

任务结果回调


Jarvis 系统并不关心任务具体业务逻辑的对错,我们只保证任务成功在资源上运行。如果业务方需要拿到任务执行完的结果,可以在业务逻辑中任务结束前调自己的接口。我们也提供了回调机制:在创建 Job 时可以指定 CallBack URL,任务执行结束后,Jarvis 会将任务的输出结果进行回调。

保证某些场景下的任务幂等性


在系统的基本逻辑基础上,任务 (Job) 本身是具有幂等性的,因为任务 (Job) 可以抽象成一个请求,但是因为存在重试机制和补偿机制的缘故,为了避免在这些机制下产生任务 (Job) 被重复执行(即需要保证一个任务只会被一台机器执行一次),Jarvis 在基于 Redis Check + 分布式 ID + CallBack 机制下去保证这些场景下的「任务幂等性」。

Scheduler 模块


Scheduler 的核心功能是对任务进行调度,负责任务在创建后的“绑定资源 -> 指派任务”的过程。

调度策略


Jarvis 使用的调度算法主要有:


  • First Come First Serve


最简单的一个调度算法:先来先服务。维护非抢占式的任务


  • Multi Priority Level Queue


多优先级级队列调度算法。「多级」表示有多个队列,每个队列优先级从高到低。维护抢占式任务。当抢占式任务因资源不足无法执行时,会对低优先级的任务进行抢占,被抢占的任务会走正常的重试逻辑,直到重试次数用完。


  • 时间容忍性调度


除了以上两种调度算法外,我们还基于 ECI 实现了一个对业务场景很实用的 Feature:容忍等待时间。


考虑如下场景:有一些任务的实时性要求不高,在 24 小时内执行完就可以,而我们只有有限的物理机资源。当瞬时创建大量这种任务时,就算全部资源满负荷运转,同时能跑的任务数量也很有限。Scheduler 会将这些任务积压在队列中,在这 24 小时里充分利用资源,如果达到任务要求的可容忍等待时间后,资源仍然不足,Scheduler 就会直接将其调度到 ECI 上执行。


与”分布式时间轮”问题类似,Scheduler 集群各个节点的会维护的队列,也没有备份。但是 Scheduler 中没有 Bucket 的概念,它是完全无状态去中心化的。为了解决这个问题,我们在监听到 Scheduler 退出信号后,会把队列中未处理的 Jobs 信息进行回调,再通过 Job Manager 转发到其它节点。

调度流程


  • Scheduler 从 Job Manager 拉取 Schedulable 状态的 Job。

  • 将 Job 推送给 ResourceManager 尝试绑定资源。

  • 如果绑定资源成功,根据资源信息将 Job 推送到指定的 Worker 上执行。

  • 如果绑定资源失败,走抢占逻辑或者重新入队。

ResourceManager 模块

调度系统现状


Google 的研究工作 1 表明,调度系统经历了从单层调度系统到双层调度系统再到共享状态调度系统的演变过程:


  • 单层调度系统架构简单且单调度器容易保证资源的一致性,但是其并发性能差且存在单点瓶颈问题不适于大规模集群的调度。


  • 双层调度系统将资源管理和任务调度解耦,每个上层调度器都有自己的资源视图且可以自定义任务调度的逻辑。提高了调度系统的灵活性和可扩展性,典型代表如 Mesos 等。双层调度系统虽然提高了扩展性,但是由于上层调度器只有局部资源视图因此任务的资源分配不是全局最优,且任务进行资源抢占时无法跨调度器抢占。虽然可以有多个上层调度器较单层调度器提高了并发性,但实际采用了类似悲观锁的方式并发性仍有待提高。


  • 共享状态调度系统中每个调度器都有全局资源视图,能够实现全局最优调度,且多个调度器可以同时进行调度提高了并发性能,典型代表如 Omega 等。但是并发调度实际是采用了类似乐观锁的并发机制,因此会导致资源分配冲突,若频繁发生冲突而重新进行调度可能影响系统性能。

ResourceManager 处理流程


Jarvis 属于共享状态调度系统其资源管理模块 ResourceManager 负责系统中资源的统一管理和分配,它接受来自 Worker 的资源信息汇报,并把集群中资源按照一定的策略分配给各个任务。ResourceManager 是一个资源管理模块,并不参与任务的具体执行(启动、杀死、重启等),其主要工作包括:Allocate、Report、Release。


  • Allocate: Scheduler 申请给 Job 分配资源,收到请求 ResourceManager 先查询数据库中该 Job 的状态避免重复处理,然后根据分配策略和资源数据进行计算给 Job 分配资源。分配资源后更新 Job 的状态和资源数据,先更新缓存然后用 Pulsar 同步数据到数据库,至此该 Job 已经预占了分配给它的资源状态为 Assumed。

  • Report: ResourceManager 将资源分配结果返回,Scheduler 使用资源来执行 Job 并将 Job 使用资源的结果通知 ResourceManager。Jarvis 通过超时机制来避免 Job 申请资源后长时间不使用,导致资源浪费,资源使用结果上报后 Job 的状态为 Used。

  • Release: Job 执行结束或超时被 kill 后 JobManager 会请求释放该 Job 占用的资源,ResourceManager 收到请求后更新 Job 的状态为 Deleted,并回收 Job 占用的资源更新对应机器的资源数据。

ResourceManager 状态机


 +--------------------------------------------+    |                      Report Exist          |    |                                            |    +      Allocate            Report Success    v     Expire Initial +--------> Assumed +------------+---> Used +--------> Expired  ^                +   +                       +  |                |   |                       |  |                |   |                       | Release  |                |   |                       |  |                |   |                       |  +----------------+   +---------> Deleted <---+        Expire        Report Fail
复制代码


  • Initial: 刚开始调度时资源状态,此时资源还未被分配。也可能是接收到一个已经已经释放资源的任务。Assumed: ResourceManager 完成计算,为任务分配某台机器上的资源。Job 状态置为 Assumed,扣减对应机器的资源,此时 Job 预占了被分配的资源。

  • Used: Scheduler 接收到资源分配结果,启动 Job 后将资源使用结果回调 ResourceManager。若 Job 被成功启动,标记对应资源为已使用,Job 对应资源状态置为 Used,若启动失败则重置为 Initial 并扣减机器资源。

  • Deleted: 任务执行结束(成功或失败),或长期占用资源被 kill,都会进入 Deleted 状态。Job 对应资源状态置为 Deleted,补增对应机器的资源。

  • Expired: 计算调度结果后 Scheduler 超时未返回资源使用结果(丢失等)。Job 对应资源状态置为 Expired,补增对应机器的资源。

一次资源分配处理


ResourceManager 为 Job 分配资源的处理步骤包括:资源数据同步、资源分配、提交资源分配结果

资源数据同步


ResourceManger 模块在接收到任务后会先读取机器最新资源数据,用于资源分配的决策。目前获取资源数据采用的是全量获取的方式,该方式在机器量较少的情况可以适用,但是随着机器资源的不断增多资源同步会因耗时较长而降低系统的调度性能。新版 ResourceManager 已准备采用增量更新的方式来进行资源数据同步,每个 ResourceManger 本地缓存全部资源数据,后续根据时间戳来同步需要更新的数据,该方式在机器资源规模较大时可以保证资源同步的效率从而提高系统的调度效率。

资源分配


资源分配阶段 ResourceManager 会根据同步到的资源数据为任务分配合适的资源,主要包括:筛选、排序、优选 3 个步骤。


  • 筛选是对每台机器进行检测,筛选出满足任务需求的机器(这里机器是一个抽象说法,可以指代物理机、虚拟机 、容器等)。筛选条件包括:


  1. 机器是否支持处理当前类型的任务,判断机器上报数据中支持任务类型的 JobType List 是否包含当前任务的 JobType。JobType 包括 golang、docker、python, http 等且支持自定义任务类型。


  1. 机器是否满足当前任务的资源需求,判断:机器总的可分配资源量 * 机器最大资源使用率 >= 机器已分配资源量 + 任务所需资源量,不满足该条件则淘汰。资源种类包括 CPU、内存、GPU、磁盘等。


  • 排序是对上一步筛选出的机器按照多个资源分配策略打分,对每台机器都会计算一个 0~100 之间的分数,表示当前任务放到该机器的合适程度,其中 100 表示非常合适,0 表示非常不合适。每个不同的策略都有一个权重值,最终的分数为权重和策略计算结果的乘积,而一个机器的分数就是所有策略计算结果的加和。比如有两种优先级函数 priorityStrategy1 和 priorityStrategy2,对应的权重分别为 weight1 和 weight2,那么节点 X 的最终得分是:


finalScoreHostX = (weight1 * priorityStrategy1) + (weight2 * priorityStrategy2)


  • 优选是按照排序的结果选择合适的机器分配给当前任务,这里有两点要考虑:


  1. 选取机器前先打散,确保任务被均匀调度到每台机器上,避免多次选中同一台机器。

  2. 每次选择多台机器其余留作备用,以应对资源分配冲突的情况。


伴鱼的许多业务都具有较为明显的业务高峰时段,例如教室上课、教学音视频录制、音视频转码、实时性数据分析等。接入这些业务后势必会导致调度系统在某个时间段内调度并发数飙升,为保证调度系统较高的吞吐率 ResourceManager 会一次接收多个 Scheduler 投递的任务,并灵活的将同类型 Job 进行资源合并后再进行资源分配。例如:JobA 和 JobB 都需要 1Core 1G ,则先合并为一个 2 Core 2G 的 Job 再进行资源分配。


此外 Jarvis 是一个支持异构资源的调度系统,目前 ResourceManager 管理的机器资源包括物理机集群、K8S 集群、ECI、EKS 等。资源分配时会优先分配公司机器资源,公司机器资源不足时分配 ECI 等资源以保证可弹性扩容。

提交资源分配结果


通过上一步资源分配处理已经确定任务要被分配到哪台机器,ResourceManager 依靠事务来提交资源分配结果以保证资源的一致性:在选定的机器上模拟扣减资源,并再次检测任务类型的满足性、亲和性等。若所有条件满足则进行数据更新,不满足则发生资源分配冲突。为了减少资源分配冲突后二次调度带来的开销,在资源分配冲突时直接从备选机器中选择一台机器进行分配处理。提交事务进行资源扣减,若成功则调度成功,若失败则回滚。


缓存与数据库的数据同步方案:

  • 使用 Redis + Tidb,读数据先读 Redis,如果没有读取到改为从 Tidb 读取并更新到 Redis。


  • 写数据先写 Redis(lua 脚本保证原子性),写入失败,则失败重试,写入成功,则把消息推入 mq。单独启动一个协程从 mq 取数据写入数据库,通过 mq 的重试机制保证消息最终会被写入 Tidb,数据最终一致。


其中修改机器资源的 lua 脚本片段如下:


local hostData = redis.call("HGET", hostDataKey, hostName)if hostData == nil or type(hostData) ~= "string" then    return redisNilend
local jsonHostData = cjson.decode(hostData)
local hostCpu = jsonHostData["hostcpu"]local hostMem = jsonHostData["hostmem"]local allocatedCpu = jsonHostData["allocatedcpu"]local allocatedMem = jsonHostData["allocatedmem"]
if allocatedCpu + jobCpu > hostCpu*maxPercent then return resourceInsufficientend
if allocatedMem + jobMem > hostMem*maxPercent then return resourceInsufficientend
jsonHostData["allocatedcpu"] = allocatedCpu + jobCpujsonHostData["allocatedmem"] = allocatedMem + jobMemjsonHostData["utime"] = tonumber(uTime)
local strHostData = cjson.encode(jsonHostData)
local b = redis.call("HSET", hostDataKey, hostName, strHostData)if b ~= 0 then local d = redis.call("HDEL", hostDataKey, hostName) return redisFailend
return success
复制代码

Worker


Worker 是部署在资源节点上的代理,其核心功能是资源负载上报和管理宿主机执行具体任务。Worker 启动后会将宿主机注册到 Resource Manager 中,并定时上报 CPU、Memory 等信息。Scheduler 会将绑定资源成功的任务 dispatch 到指定的 Worker,Worker 会在宿主机上启动执行对应的脚本或容器。任务结束后,Worker 需要将结果上报给 Job Manager。

使用场景


Jarvis 可以帮助业务方管理不同类型的资源,将任务与业务逻辑解耦,通过我们提供的接口就可以快速创建各种类型的任务。目前直播中台已有多个算法离线分析预测任务正式接入 Jarvis 系统,文章开始提到的录制任务也已经跑通了接入流程,正在逐步把线上流量慢慢迁移到 Jarvis 中。

展望


  • Jarvis 目前已基本实现了我们最初设计的全部核心功能,但仍有一些不足之处,需要后续持续优化改进。


  • Web 界面: 支持业务方可视化管理 Job,可视化编辑 DAG 工作流等。


  • 资源池: 对于实时性要求极高的任务,需要开发资源池减少任务启动时间。

参考资料


  • [1] Omega: flexible, scalable schedulers for large compute clusters

  • [2] Wikipedia — Scheduling

  • [3] Apache Kafka, Purgatory, and Hierarchical Timing Wheels

  • [4] Apache Kafka Timer Implement Source Code


作者:闫云龙、宋园园

原文:https://tech.ipalfish.com/blog/2021/06/07/jarvis/

原文:伴鱼分布式调度系统 Jarvis 的设计与实现

来源:伴鱼技术博客

转载:著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

2021-07-08 14:294624
用户头像
伴鱼技术团队 一群崇尚简单与极致效率的工程师~

发布了 49 篇内容, 共 29.2 次阅读, 收获喜欢 389 次。

关注

评论 8 条评论

发布
用户头像
开源了吗? 写的好详细
2023-03-16 23:10 · 北京
回复
用户头像
被高优先级job抢占后,为什么killed了?
2021-07-09 15:36
回复
被killed后可以释放出资源
2021-07-09 16:12
回复
抢占后一般后续还需要执行吧, 如果是长时间的任务,那中间结果需要保存吗?
2021-07-09 16:18
回复
系统是不负责保存任务上下文的。任务被抢占是因为资源不足且优先级低,如果有需要的话,任务可以在被抢占时自己保存中间状态
2021-07-09 17:20
回复
用户头像
这个Jarvis跟蘑菇街的Jarvis有什么渊源?
2021-07-08 23:12
回复
才知道蘑菇街的也叫Jarvis..
2021-07-09 15:01
回复
没有渊源,纯属巧合
2021-07-09 15:01
回复
没有更多了
发现更多内容
伴鱼分布式调度系统 Jarvis 的设计与实现_AI&大模型_伴鱼技术团队_InfoQ精选文章