写点什么

Golang 并发编程与定时器

  • 2019-12-03
  • 本文字数:7053 字

    阅读完需:约 23 分钟

Golang 并发编程与定时器

5.3 定时器

对于任何一个正在运行的应用,如何获取准确的绝对时间都非常重要,但是在一个分布式系统中我们很难保证各个节点上绝对时间的一致性,哪怕通过 NTP 这种标准的对时协议也只能把时间的误差控制在毫秒级,所以相对时间在一个分布式系统中显得更为重要,我们在这一节中就会介绍 Go 语言中的定时器以及它在并发编程中起到什么样的作用。


绝对时间一定不会是完全准确的,它对于一个运行中的分布式系统其实没有太多指导意义,但是由于相对时间的计算不依赖于外部的系统,所以它的计算可以做的比较准确,我们在这一节中就会介绍 Go 语言中用于计算相对时间的定时器的实现原理。

__1. 结构

timer 就是 Golang 定时器的内部表示,每一个 timer 其实都存储在堆中,tb 就是用于存储当前定时器的桶,而 i 是当前定时器在堆中的索引,我们可以通过这两个变量找到当前定时器在堆中的位置:


type timer struct {    tb *timersBucket    i  int
when int64 period int64 f func(interface{}, uintptr) arg interface{} seq uintptr}
复制代码


when 表示当前定时器(Timer)被唤醒的时间,而 period 表示两次被唤醒的间隔,每当定时器被唤醒时都会调用 f(args, now) 函数并传入 args 和当前时间作为参数。然而这里的 timer 作为一个私有结构体其实只是定时器的运行时表示,time 包对外暴露的定时器使用了如下所示的结构体:


type Timer struct {    C <-chan Time    r runtimeTimer}
复制代码


Timer 定时器必须通过 NewTimer 或者 AfterFunc 函数进行创建,其中的 runtimeTimer 其实就是上面介绍的 timer 结构体,当定时器失效时,失效的时间就会被发送给当前定时器持有的 Channel C,订阅管道中消息的 Goroutine 就会收到当前定时器失效的时间。


time 包中,除了 timerTimer 两个分别用于表示运行时定时器和对外暴露的 API 之外,timersBucket 这个用于存储定时器的结构体也非常重要,它会存储一个处理器上的全部定时器,不过如果当前机器的核数超过了 64 核,也就是机器上的处理器 P 的个数超过了 64 个,多个处理器上的定时器就可能存储在同一个桶中:


type timersBucket struct {    lock         mutex    gp           *g    created      bool    sleeping     bool    rescheduling bool    sleepUntil   int64    waitnote     note    t            []*timer}
复制代码


每一个 timersBucket 中的 t 就是用于存储定时器指针的切片,每一个运行的 Go 语言程序都会在内存中存储着 64 个桶,这些桶中都存储定时器的信息:



每一个桶持有的 timer 切片其实都是一个最小堆,这个最小堆会按照 timer 应该触发的时间对它们进行排序,最小堆最上面的定时器就是最近需要被唤醒的 timer,我们会在下面展开介绍定时器的创建和触发过程。

__2. 工作原理

既然我们已经介绍了定时器的数据结构,接下来我们就可以开始分析它的常见操作以及工作原理了,在这一节中我们将介绍定时器的创建、触发、time.Sleep 与定时器的关系以及计时器 Ticker 的实现原理。

__2.1. 创建

time 包对外提供了两种创建定时器的方法,第一种方法就是 NewTimer 接口,这个接口会创建一个用于通知触发时间的 Channel、调用 startTimer 方法并返回一个创建指向 Timer 结构体的指针:


func NewTimer(d Duration) *Timer {    c := make(chan Time, 1)    t := &Timer{        C: c,        r: runtimeTimer{            when: when(d),            f:    sendTime,            arg:  c,        },    }    startTimer(&t.r)    return t}
复制代码


另一个用于创建 Timer 的方法 AfterFunc 其实也提供了非常相似的结构,与 NewTimer 方法不同的是该方法没有创建一个用于通知触发时间的 Channel,它只会在定时器到期时调用传入的方法:


func AfterFunc(d Duration, f func()) *Timer {    t := &Timer{        r: runtimeTimer{            when: when(d),            f:    goFunc,            arg:  f,        },    }    startTimer(&t.r)    return t}
复制代码


startTimer 基本上就是创建定时器的入口了,所有定时器的创建和重启基本上都需要调用该函数:


func startTimer(t *timer) {    addtimer(t)}
func addtimer(t *timer) { tb := t.assignBucket() tb.addtimerLocked(t)}
复制代码


它会调用 addTimer 函数,这个函数总共做了两件事情,首先通过 assignBucket 方法为当前定时器选择一个 timersBucket 桶,我们会根据当前 Goroutine 所在处理器 P 的 id 选择一个合适的桶,随后调用 addTimerLocked 方法将当前定时器加入桶中:


func (tb *timersBucket) addtimerLocked(t *timer) bool {    t.i = len(tb.t)    tb.t = append(tb.t, t)    if !siftupTimer(tb.t, t.i) {        return false    }    if t.i == 0 {        if tb.sleeping && tb.sleepUntil > t.when {            tb.sleeping = false            notewakeup(&tb.waitnote)        }        if tb.rescheduling {            tb.rescheduling = false            goready(tb.gp, 0)        }        if !tb.created {            tb.created = true            go timerproc(tb)        }    }    return true}
复制代码


addtimerLocked 会先将最新加入的定时器加到队列的末尾,随后调用 siftupTimer 将当前定时器与四叉树(或者四叉堆)中的父节点进行比较,保证父节点的到期时间一定小于子节点:



这个四叉树只能保证父节点的到期时间大于子节点,这对于我们来说其实也足够了,因为我们只关心即将被触发的计数器,如果当前定时器是第一个被加入四叉树的定时器,我们还会通过 go timerproc(tb) 启动一个 Goroutine 用于处理当前树中的定时器,这也是处理定时器的核心方法。

__2.2. 触发

定时器的触发都是由 timerproc 中的一个双层 for 循环控制的,外层的 for 循环主要负责对当前 Goroutine 进行控制,它不仅会负责锁的获取和释放,还会在合适的时机触发当前 Goroutine 的休眠:


func timerproc(tb *timersBucket) {    tb.gp = getg()    for {        tb.sleeping = false        now := nanotime()        delta := int64(-1)
// inner loop
if delta < 0 { tb.rescheduling = true goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1) continue } tb.sleeping = true tb.sleepUntil = now + delta noteclear(&tb.waitnote) notetsleepg(&tb.waitnote, delta) }}
复制代码


如果距离下一个定时器被唤醒的时间小于 0,当前的 timerproc 就会将 rescheduling 标记设置成 true 并立刻陷入休眠,这其实也意味着当前 timerproc 中不包含任何待处理的定时器,当我们再向该 timerBucket 加入定时器时就会重新唤醒 timerproc Goroutine。


在其他情况下,也就是下一次计数器的响应时间是 now + delta 时,timerproc 中的外层循环会通过 notesleepg 将当前 Goroutine 陷入休眠。


func notetsleepg(n *note, ns int64) bool {    gp := getg()    if gp == gp.m.g0 {        throw("notetsleepg on g0")    }    semacreate(gp.m)    entersyscallblock()    ok := notetsleep_internal(n, ns, nil, 0)    exitsyscall()    return ok}
复制代码


该函数会先获取当前的 Goroutine 并在当前的『CPU 上』创建一个信号量,随后在 entersyscallblockexitsyscall 之间执行系统调用让当前的 Goroutine 陷入休眠并在 ns 纳秒后返回。


内部循环的主要作用就是触发已经到期的定时器,在这个内部循环中,我们会按照以下的流程对当前桶中的定时器进行处理:


  1. 如果桶中不包含任何定时器就会直接返回并陷入休眠等待定时器加入当前桶;

  2. 如果四叉树最上面的定时器还没有到期会通过 notetsleepg 方法陷入休眠等待最近定时器的到期;

  3. 如果四叉树最上面的定时器已经到期;

  4. 当定时器的 period > 0 就会设置下一次会触发定时器的时间并将当前定时器向下移动到对应的位置;

  5. 当定时器的 period <= 0 就会将当前定时器从四叉树中移除;

  6. 在每次循环的最后都会从定时器中取出定时器中的函数、参数和序列号并调用函数触发该计数器;


for {            if len(tb.t) == 0 {                delta = -1                break            }            t := tb.t[0]            delta = t.when - now            if delta > 0 {                break            }            ok := true            if t.period > 0 {                t.when += t.period * (1 + -delta/t.period)                if !siftdownTimer(tb.t, 0) {                    ok = false                }            } else {                last := len(tb.t) - 1                if last > 0 {                    tb.t[0] = tb.t[last]                    tb.t[0].i = 0                }                tb.t[last] = nil                tb.t = tb.t[:last]                if last > 0 {                    if !siftdownTimer(tb.t, 0) {                        ok = false                    }                }                t.i = -1 // mark as removed            }            f := t.f            arg := t.arg            seq := t.seq            f(arg, seq)        }
复制代码


使用 NewTimer 创建的定时器,传入的函数时 sendTime,它会将当前时间发送到定时器持有的 Channel 中,而使用 AfterFunc 创建的定时器,在内层循环中调用的函数就会是调用方传入的函数了。

__2.3. 休眠

如果你使用过一段时间的 Go 语言,你一定在项目中使用过 time 包中的 Sleep 方法让当前的 Goroutine 陷入休眠以等待某些条件的完成或者触发一些定时任务,time.Sleep 就是通过如下所示的 timeSleep 方法完成的:


func timeSleep(ns int64) {    if ns <= 0 {        return    }
gp := getg() t := gp.timer if t == nil { t = new(timer) gp.timer = t } *t = timer{} t.when = nanotime() + ns t.f = goroutineReady t.arg = gp tb := t.assignBucket() lock(&tb.lock) if !tb.addtimerLocked(t) { unlock(&tb.lock) badTimer() } goparkunlock(&tb.lock, waitReasonSleep, traceEvGoSleep, 2)}
复制代码


timeSleep 会创建一个新的 timer 结构体,在初始化的过程中我们会传入当前 Goroutine 应该被唤醒的时间以及唤醒时需要调用的函数 goroutineReady,随后会调用 goparkunlock 将当前 Goroutine 陷入休眠状态,当定时器到期时也会调用 goroutineReady 方法唤醒当前的 Goroutine:


func goroutineReady(arg interface{}, seq uintptr) {    goready(arg.(*g), 0)}
复制代码


time.Sleep 方法其实只是创建了一个会在到期时唤醒当前 Goroutine 的定时器并通过 goparkunlock 将当前的协程陷入休眠状态等待定时器触发的唤醒。

__2.4. Ticker

除了只用于一次的定时器(Timer)之外,Go 语言的 time 包中还提供了用于多次通知的 Ticker 计时器,计时器中包含了一个用于接受通知的 Channel 和一个定时器,这两个字段共同组成了用于连续多次触发事件的计时器:


type Ticker struct {    C <-chan Time // The channel on which the ticks are delivered.    r runtimeTimer}
复制代码


想要在 Go 语言中创建一个计时器只有两种方法,一种是使用 NewTicker 方法显示地创建Ticker 计时器指针,另一种可以直接通过 Tick 方法获取一个会定期发送消息的 Channel:


func NewTicker(d Duration) *Ticker {    if d <= 0 {        panic(errors.New("non-positive interval for NewTicker"))    }    c := make(chan Time, 1)    t := &Ticker{        C: c,        r: runtimeTimer{            when:   when(d),            period: int64(d),            f:      sendTime,            arg:    c,        },    }    startTimer(&t.r)    return t}
func Tick(d Duration) <-chan Time { if d <= 0 { return nil } return NewTicker(d).C}
复制代码


Tick 其实也只是对 NewTicker 的简单封装,从实现上我们就能看出来它其实就是调用了 NewTicker 获取了计时器并返回了计时器中 Channel,两个创建计时器的方法的实现都并不复杂而且费容易理解,所以在这里也就不详细展开介绍了。


需要注意的是每一个 NewTicker 方法开启的计时器都需要在不需要使用时调用 Stop 进行关闭,如果不显示调用 Stop 方法,创建的计时器就没有办法被垃圾回收,而通过 Tick 创建的计时器由于只对外提供了 Channel,所以是一定没有办法关闭的,我们一定要谨慎使用这一接口创建计时器。

__3. 性能分析

定时器在内部使用四叉树的方式进行实现和存储,当我们在生产环境中使用定时器进行毫秒级别的计时时,在高并发的场景下会有比较明显的性能问题,我们可以通过实验测试一下定时器在高并发时的性能,假设我们有以下的代码:


func runTimers(count int) {    durationCh := make(chan time.Duration, count)
wg := sync.WaitGroup{} wg.Add(count) for i := 0; i < count; i++ { go func() { startedAt := time.Now() time.AfterFunc(10*time.Millisecond, func() { defer wg.Done() durationCh <- time.Since(startedAt) }) }()
} wg.Wait()
close(durationCh)
durations := []time.Duration{} totalDuration := 0 * time.Millisecond for duration := range durationCh { durations = append(durations, duration) totalDuration += duration } averageDuration := totalDuration / time.Duration(count) sort.Slice(durations, func(i, j int) bool { return durations[i] < durations[j] })
fmt.Printf("run %v timers with average=%v, pct50=%v, pct99=%v\n", count, averageDuration, durations[count/2], durations[int(float64(count)*0.99)])}
复制代码


完整的性能测试代码可以在 benchmark_timers.go 中找到,需要注意的是:由于机器和性能的不同,多次运行测试可能会有不一样的结果。


这段代码开了 N 个 Goroutine 并在每一个 Goroutine 中运行一个定时器,我们会在定时器到期时将开始计时到定时器到期所用的时间加入 Channel 并用于之后的统计,在函数的最后我们会计算出 N 个 Goroutine 中定时器到期时间的平均数、50 分位数和 99 分位数:


$ go test ./... -v=== RUN   TestTimersrun 1000 timers with average=10.367111ms, pct50=10.234219ms, pct99=10.913219msrun 2000 timers with average=10.431598ms, pct50=10.37367ms, pct99=11.025823msrun 5000 timers with average=11.873773ms, pct50=11.986249ms, pct99=12.673725msrun 10000 timers with average=11.954716ms, pct50=12.313613ms, pct99=13.507858msrun 20000 timers with average=11.456237ms, pct50=10.625529ms, pct99=25.246254msrun 50000 timers with average=21.223818ms, pct50=14.792982ms, pct99=34.250143msrun 100000 timers with average=36.010924ms, pct50=31.794761ms, pct99=128.089527msrun 500000 timers with average=176.676498ms, pct50=138.238588ms, pct99=676.967558ms--- PASS: TestTimers (1.21s)
复制代码


我们将上述代码输出的结果绘制成如下图所示的折线图,其中横轴是并行定时器的个数,纵轴表示定时器从开始到触发时间的差值,三个不同的线分别表示时间的平均值、50 分位数和 99 分位数:



虽然测试的数据可能有一些误差,但是从图中我们也能得出一些跟定时器性能和现象有关的结论:


  • 定时器触发的时间一定会晚于创建时传入的时间,假设定时器需要等待 10ms 触发,那它触发的时间一定是晚于 10ms 的;

  • 当并发的定时器数量达到 5000 时,定时器的平均误差达到了 ~18%,99 分位数上的误差达到了 ~26%;

  • 并发定时器的数量超过 5000 之后,定时器的误差就变得非常明显,不能有效、准确地完成计时任务;


这其实也是因为定时器从开始到触发的时间间隔非常短,当我们将计时的时间改到 100ms 时就会发现性能问题有比较明显的改善:



哪怕并行运行了 10w 个定时器,99 分位数的误差也只有 ~12%,我们其实能够发现 Go 语言标准库中的定时器在计时时间较短并且并发较高时有着非常明显的问题,所以在一些性能非常敏感的基础服务中使用定时器一定要非常注意 —— 它可能达不到我们预期的效果。


不过哪怕我们不主动使用定时器,而是使用 context.WithDeadline 这种方法,由于它底层也会使用定时器实现,所以仍然会受到影响。

__4. 总结

Go 语言的定时器在并发编程起到了非常重要的作用,它能够为我们提供比较准确的相对时间,基于它的功能,标准库中还提供了计时器、休眠等接口能够帮助我们在 Go 语言程序中更好地处理过期和超时等问题。


标准库中的定时器在大多数情况下是能够正常工作并且高效完成任务的,但是在遇到极端情况或者性能敏感场景时,它可能没有办法胜任,而在 10ms 的这个粒度下,作者在社区中也没有找到能够使用的定时器实现,一些使用时间轮算法的开源库也不能很好地完成这个任务。

__5. Reference

__6. 其他

__6.1. 关于图片和转载

本文转载自 Draveness 技术博客。


原文链接:https://draveness.me/golang/concurrency/golang-timer.html


2019-12-03 15:092004

评论

发布
暂无评论
发现更多内容

手把手入门MO | 如何使用SeaTunnel将数据写入MatrixOne

MatrixOrigin

分布式数据库 云原生数据库 MatrixOrigin MatrixOne 超融合数据库

3D建模工具Archicad 26 完美激活版下载

iMac小白

EndNote 21更新 最新EndNote 21mac破解版下载 支持MacOS14

iMac小白

Util应用框架基础(七)- 本地缓存

何镇汐

开源 后端 .net core

MatrixOne实战系列回顾 | 导入导出项目场景实践

MatrixOrigin

分布式数据库 云原生数据库 MatrixOrigin MatrixOne 超融合数据库

将 Spring 微服务与 BI 工具集成:最佳实践

互联网工科生

spring 微服务 BI 分析工具

Past for mac好用的苹果电脑剪切板工具下载

iMac小白

Util应用框架基础(七)- 缓存

何镇汐

开源 后端 .net core

IBM SPSS Statistics mac激活破解版下载

iMac小白

科创人·蓝凌董事长杨健伟:夯实“四梁八柱”,让数字化“城中村上建高楼”

科创人

数字化转型 企业家精神

模块一作业

闻明杨

架构实战营

Permute 3 for mac(音视频转换器) 3.11.2中文版

iMac小白

企业app软件定制开发的重点是什么?|小程序网站搭建

Geek_16d138

网站建设 小程序开发 app定制开发

如何将Docker的构建时间减少40%

高端章鱼哥

Docker 镜像

AI,正在「整顿」企业面试

用友BIP

KeyShot 2023.3 Pro for mac激活版下载(3D渲染和动画制作软件)

iMac小白

Camunda、Activiti、Flowable等各大工作流对比选择

小狗围观科幻

矢量图形编辑软件Sketch for mac完美破解版下载

iMac小白

影视行业如何远程完整快速传输大文件?

镭速

传输大文件

什么是Mock?为什么要使用Mock呢?

我爱娃哈哈😍

Mock Mock 服务 mock设计

企业软件定制开发的重点是什么?|app小程序网站建设

Geek_16d138

APP开发 软件定制

区块链开发公司

区块链技术

mac电脑音乐创作软件Ableton Live Suite 11中文破解版下载

iMac小白

软件测试/人工智能丨视觉与图像识别在自动化测试领域的应用

测试人

人工智能 软件测试

Util应用框架基础(七)- Redis 缓存

何镇汐

开源 后端 .net core

报名仅剩一周!课程直播和1V1指导助力文心一言插件开发赛事冲榜

飞桨PaddlePaddle

插件开发 文心一言 开发者插件

Util应用框架基础(七)- 二级缓存

何镇汐

开源 后端 .net core

关联规则挖掘:Apriori算法的深度探讨

不在线第一只蜗牛

Python 大数据 算法

Logic Pro X for Mac v10.8.0中文直装版下载

iMac小白

加速CI构建,实现高效流水线——CloudBees CI发布工作区缓存功能

龙智—DevSecOps解决方案

ci

JProfiler 14 for Mac(Java开发分析工具) 14.0激活版

iMac小白

Golang 并发编程与定时器_文化 & 方法_Draveness_InfoQ精选文章