本文由于篇幅过长,分文上下两篇,此篇为上篇。
本文主要从源码角度针对 Go 调度相关进行分析,从进程的启动,到调度循环分析,再到分析几个常见 runtime 下的场景可以清晰的了解调度过程。本文仅关注 linux 系统下的逻辑。代码版本参考 Go1.9.2。
阅读索引
1.简单概念
1.1 调度器的三个抽象概念:G、M、P
1.2 调度的大致轮廓
2.进程启动时都做了什么
2.1 runtime.osinit(SB)方法针对系统环境的初始化
2.2 runtime.schedinit(SB)调度相关的一些初始化
2.3 runtime·mainPC(SB)启动监控任务
3.调度循环都做了什么
3.1 调度器如何开启调度循环
3.2 调度器如何进行调度循环
3.3 多个线程下如何调度
4.调度循环中如何让出 CPU
4.1 执行完成让出 CPU
4.2 主动让出 CPU
4.3 抢占让出 CPU
4.4 系统调用让出 CPU
5.待执行 G 的来源
5.1 go func 创建 G
5.2 epoll 来源
6.看几个主动让出 CPU 的场景
6.1 time.Sleep
6.2 sync.Mutex
6.3 channel
调度器的三个抽象概念:G、M、P
G:代表一个 goroutine,每个 goroutine 都有自己独立的栈存放当前的运行内存及状态。可以把一个 G 当做一个任务。
M: 代表内核线程(Pthread),它本身就与一个内核线程进行绑定,goroutine 运行在 M 上。
P:代表一个处理器,可以认为一个“有运行任务”的 P 占了一个 CPU 线程的资源,且只要处于调度的时候就有 P。
注:内核线程和 CPU 线程的区别,在系统里可以有上万个内核线程,但 CPU 线程并没有那么多,CPU 线程也就是 Top 命令里看到的 CPU0、CPU1、CPU2…的数量。
三者关系大致如下图:
图 1、图 2 代表 2 个有运行任务时的状态。M 与一个内核线程绑定,可运行的 goroutine 列表存放到 P 里面,然后占用了一个 CPU 线程来运行。
图 3 代表没有运行任务时的状态,M 依然与一个内核线程绑定,由于没有运行任务因此不占用 CPU 线程,同时也不占用 P。
调度的大致轮廓
图中表述了由 go func 触发的调度。先创建 M 通过 M 启动调度循环,然后调度循环过程中获取 G 来执行,执行过程中遇到图中 running G 后面几个 case 再次进入下一循环。
下面从程序启动、调度循环、G 的来源三个角度分析调度的实现。
进程启动时都做了什么?
下面先看一段程序启动的代码
// runtime/asm_amd64.s
TEXT runtime·rt0_go(SB),NOSPLIT,$0
......此处省略N多代码......
ok:
// set the per-goroutine and per-mach "registers"
get_tls(BX) // 将 g0 放到 tls(thread local storage)里
LEAQ runtime·g0(SB), CX
MOVQ CX, g(BX)
LEAQ runtime·m0(SB), AX
// save m->g0 = g0 // 将全局M0与全局G0绑定
MOVQ CX, m_g0(AX)
// save m0 to g0->m
MOVQ AX, g_m(CX)
CLD // convention is D is always left cleared
CALL runtime·check(SB)
MOVL 16(SP), AX // copy argc
MOVL AX, 0(SP)
MOVQ 24(SP), AX // copy argv
MOVQ AX, 8(SP)
CALL runtime·args(SB) // 解析命令行参数
CALL runtime·osinit(SB) // 只初始化了CPU核数
CALL runtime·schedinit(SB) // 内存分配器、栈、P、GC回收器等初始化
// create a new goroutine to start program
MOVQ $runtime·mainPC(SB), AX //
PUSHQ AX
PUSHQ $0 // arg size
CALL runtime·newproc(SB) // 创建一个新的G来启动runtime.main
POPQ AX
POPQ AX
// start this M
CALL runtime·mstart(SB) // 启动M0,开始等待空闲G,正式进入调度循环
MOVL $0xf1, 0xf1 // crash
RET
复制代码
在启动过程里主要做了这三个事情(这里只跟调度相关的):
M0 是什么?程序里会启动多个 M,第一个启动的叫 M0。
G0 是什么?G 分三种,第一种是执行用户任务的叫做 G,第二种执行 runtime 下调度工作的叫 G0,每个 M 都绑定一个 G0。第三种则是启动 runtime.main 用到的 G。写程序接触到的基本都是第一种
我们按照顺序看是怎么完成上面三个事情的。
runtime.osinit(SB)方法针对系统环境的初始化
这里实质只做了一件事情,就是获取 CPU 的线程数,也就是 Top 命令里看到的 CPU0、CPU1、CPU2…的数量。
// runtime/os_linux.go
func osinit() {
ncpu = getproccount()
}
复制代码
runtime.schedinit(SB)调度相关的一些初始化
// runtime/proc.go
// 设置最大M数量
sched.maxmcount = 10000
// 初始化当前M,即全局M0
mcommoninit(_g_.m)
// 查看应该启动的P数量,默认为cpu core数.
// 如果设置了环境变量GOMAXPROCS则以环境变量为准,最大不得超过_MaxGomaxprocs(1024)个
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procs > _MaxGomaxprocs {
procs = _MaxGomaxprocs
}
// 调整P数量,此时由于是初始化阶段,所以P都是新建的
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
复制代码
这里 sched.maxmcount 设置了 M 最大的数量,而 M 代表的是系统内核线程,因此可以认为一个进程最大只能启动 10000 个系统线程。
procresize 初始化 P 的数量,procs 参数为初始化的数量,而在初始化之前先做数量的判断,默认是 ncpu(与 CPU 核数相等)。也可以通过环境变量 GOMAXPROCS 来控制 P 的数量。_MaxGomaxprocs 控制了最大的 P 数量只能是 1024。
有些人在进程初始化的时候经常用到 runtime.GOMAXPROCS() 方法,其实也是调用的 procresize 方法重新设置了最大 CPU 使用数量。
runtime·mainPC(SB)启动监控任务
// runtime/proc.go
// The main goroutine.
func main() {
......
// 启动后台监控
systemstack(func() {
newm(sysmon, nil)
})
......
}
复制代码
在 runtime 下会启动一个全程运行的监控任务,该任务用于标记抢占执行过长时间的 G,以及检测 epoll 里面是否有可执行的 G。下面会详细说到。
最后 runtime·mstart(SB)启动调度循环
前面都是各种初始化操作,在这里开启了调度器的第一个调度循环。(这里启动的 M 就是 M0)
下面来围绕 G、M、P 三个概念介绍 Goroutine 调度循环的运作流程。
调度循环都做了什么
图 1 代表 M 启动的过程,把 M 跟一个 P 绑定再一起。在程序初始化的过程中说到在进程启动的最后一步启动了第一个 M(即 M0),这个 M 从全局的空闲 P 列表里拿到一个 P,然后与其绑定。而 P 里面有 2 个管理 G 的链表(runq 存储等待运行的 G 列表,gfree 存储空闲的 G 列表),M 启动后等待可执行的 G。
图 2 代表创建 G 的过程。创建完一个 G 先扔到当前 P 的 runq 待运行队列里。在图 3 的执行过程里,M 从绑定的 P 的 runq 列表里获取一个 G 来执行。当执行完成后,图 4 的流程里把 G 仍到 gfree 队列里。注意此时 G 并没有销毁(只重置了 G 的栈以及状态),当再次创建 G 的时候优先从 gfree 列表里获取,这样就起到了复用 G 的作用,避免反复与系统交互创建内存。
M 即启动后处于一个自循环状态,执行完一个 G 之后继续执行下一个 G,反复上面的图 2~图 4 过程。当第一个 M 正在繁忙而又有新的 G 需要执行时,会再开启一个 M 来执行。
下面详细看下调度循环的实现。
调度器如何开启调度循环
先看一下 M 的启动过程(M0 启动是个特殊的启动过程,也是第一个启动的 M,由汇编实现的初始化后启动,而后续的 M 创建以及启动则是 Go 代码实现)。
// runtime/proc.go
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
if _p_ == nil {
// 从空闲P里获取一个
_p_ = pidleget()
......
}
// 获取一个空闲的m
mp := mget()
unlock(&sched.lock)
// 如果没有空闲M,则new一个
if mp == nil {
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
newm(fn, _p_)
return
}
......
// 唤醒M
notewakeup(&mp.park)
}
func newm(fn func(), _p_ *p) {
// 创建一个M对象,且与P关联
mp := allocm(_p_, fn)
// 暂存P
mp.nextp.set(_p_)
mp.sigmask = initSigmask
......
execLock.rlock() // Prevent process clone.
// 创建系统内核线程
newosproc(mp, unsafe.Pointer(mp.g0.stack.hi))
execLock.runlock()
}
// runtime/os_linux.go
func newosproc(mp *m, stk unsafe.Pointer) {
// Disable signals during clone, so that the new thread starts
// with signals disabled. It will enable them in minit.
var oset sigset
sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
sigprocmask(_SIG_SETMASK, &oset, nil)
}
func allocm(_p_ *p, fn func()) *m {
......
mp := new(m)
mp.mstartfn = fn // 设置启动函数
mcommoninit(mp) // 初始化m
// 创建g0
// In case of cgo or Solaris, pthread_create will make us a stack.
// Windows and Plan 9 will layout sched stack on OS stack.
if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" {
mp.g0 = malg(-1)
} else {
mp.g0 = malg(8192 * sys.StackGuardMultiplier)
}
// 把新创建的g0与M做关联
mp.g0.m = mp
......
return mp
}
func mstart() {
......
mstart1()
}
func mstart1() {
......
// 进入调度循环(阻塞不返回)
schedule()
}
复制代码
非 M0 的启动首先从 startm 方法开始启动,要进行调度工作必须有调度处理器 P,因此先从空闲的 P 链表里获取一个 P,在 newm 方法创建一个 M 与 P 绑定。
newm 方法中通过 newosproc 新建一个内核线程,并把内核线程与 M 以及 mstart 方法进行关联,这样内核线程执行时就可以找到 M 并且找到启动调度循环的方法。最后 schedule 启动调度循环
allocm 方法中创建 M 的同时创建了一个 G 与自己关联,这个 G 就是我们在上面说到的 g0。为什么 M 要关联一个 g0?因为 runtime 下执行一个 G 也需要用到栈空间来完成调度工作,而拥有执行栈的地方只有 G,因此需要为每个执行线程里配置一个 g0。
调度器如何进行调度循环
调用 schedule 进入调度器的调度循环后,在这个方法里永远不再返回。下面看下实现。
// runtime/proc.go
func schedule() {
_g_ := getg()
// 进入gc MarkWorker 工作模式
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
}
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
// 每处理n个任务就去全局队列获取G任务,确保公平
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
// 从P本地获取
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
// 从其它地方获取G,如果获取不到则沉睡M,并且阻塞在这里,直到M被再次使用
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
......
// 执行找到的G
execute(gp, inheritTime)
}
// 从P本地获取一个可运行的G
func runqget(_p_ *p) (gp *g, inheritTime bool) {
// If there's a runnext, it's the next G to run.
// 优先从runnext里获取一个G,如果没有则从runq里获取
for {
next := _p_.runnext
if next == 0 {
break
}
if _p_.runnext.cas(next, 0) {
return next.ptr(), true
}
}
// 从队头获取
for {
h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail
if t == h {
return nil, false
}
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
if atomic.Cas(&_p_.runqhead, h, h+1) { // cas-release, commits consume
return gp, false
}
}
}
// 从其它地方获取G
func findrunnable() (gp *g, inheritTime bool) {
......
// 从本地队列获取
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// 全局队列获取
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 从epoll里取
if netpollinited() && sched.lastpoll != 0 {
if gp := netpoll(false); gp != nil { // non-blocking
......
return gp, false
}
}
......
// 尝试4次从别的P偷
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
// 在这里开始针对P进行偷取操作
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
}
// 尝试从全局runq中获取G
// 在"sched.runqsize/gomaxprocs + 1"、"max"、"len(_p_.runq))/2"三个数字中取最小的数字作为获取的G数量
func globrunqget(_p_ *p, max int32) *g {
if sched.runqsize == 0 {
return nil
}
n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
if max > 0 && n > max {
n = max
}
if n > int32(len(_p_.runq))/2 {
n = int32(len(_p_.runq)) / 2
}
sched.runqsize -= n
if sched.runqsize == 0 {
sched.runqtail = 0
}
gp := sched.runqhead.ptr()
sched.runqhead = gp.schedlink
n--
for ; n > 0; n-- {
gp1 := sched.runqhead.ptr()
sched.runqhead = gp1.schedlink
runqput(_p_, gp1, false) // 放到本地P里
}
return gp
}
复制代码
schedule 中首先尝试从 P 本地队列中获取(runqget)一个可执行的 G,如果没有则从其它地方获取(findrunnable),最终通过 execute 方法执行 G。
runqget 先通过 runnext 拿到待运行 G,没有的话,再从 runq 里面取。
findrunnable 从全局队列、epoll、别的 P 里获取。(后面会扩展分析实现)
在调度的开头出还做了一个小优化:每处理一些任务之后,就优先从全局队列里获取任务,以保障公平性,防止由于每个 P 里的 G 过多,而全局队列里的任务一直得不到执行机会。
这里用到了一个关键方法 getg(),runtime 的代码里大量使用该方法,它由汇编实现,该方法就是获取当前运行的 G,具体实现不再这里阐述。
多个线程下如何调度
抛出一个问题:每个 P 里面的 G 执行时间是不可控的,如果多个 P 同时在执行,会不会出现有的 P 里面的 G 执行不完,有的 P 里面几乎没有 G 可执行呢?
这就要从 M 的自循环过程中如何获取 G、归还 G 的行为说起了,先看图:
图中可以看出有两种途径:1.借助全局队列 sched.runq 作为中介,本地 P 里的 G 太多的话就放全局里,G 太少的话就从全局取。2.全局列表里没有的话直接从 P1 里偷取(steal)。(更多 M 在执行的话,同样的原理,这里就只拿 2 个来举例)
第 1 种途径实现如下:
// runtime/proc.go
func runqput(_p_ *p, gp *g, next bool) {
if randomizeScheduler && next && fastrand()%2 == 0 {
next = false
}
// 尝试把G添加到P的runnext节点,这里确保runnext只有一个G,如果之前已经有一个G则踢出来放到runq里
if next {
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// 把老的g踢出来,在下面放到runq里
gp = oldnext.ptr()
}
retry:
// 如果_p_.runq队列不满,则放到队尾就结束了。
// 试想如果不放到队尾而放到队头里会怎样?如果频繁的创建G则可能后面的G总是不被执行,对后面的G不公平
h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
//如果队列满了,尝试把G和当前P里的一部分runq放到全局队列
//因为操作全局需要加锁,所以名字里带个slow
if runqputslow(_p_, gp, h, t) {
return
}
// the queue is not full, now the put above must succeed
goto retry
}
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
var batch [len(_p_.runq)/2 + 1]*g
// First, grab a batch from local queue.
n := t - h
n = n / 2
if n != uint32(len(_p_.runq)/2) {
throw("runqputslow: queue is not full")
}
// 从runq头部开始取出一半的runq放到临时变量batch里
for i := uint32(0); i < n; i++ {
batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
}
if !atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return false
}
// 把要put的g也放进batch去
batch[n] = gp
if randomizeScheduler {
for i := uint32(1); i <= n; i++ {
j := fastrandn(i + 1)
batch[i], batch[j] = batch[j], batch[i]
}
}
// 把取出来的一半runq组成链表
for i := uint32(0); i < n; i++ {
batch[i].schedlink.set(batch[i+1])
}
// 将一半的runq放到global队列里,一次多转移一些省得转移频繁
lock(&sched.lock)
globrunqputbatch(batch[0], batch[n], int32(n+1))
unlock(&sched.lock)
return true
}
func globrunqputbatch(ghead *g, gtail *g, n int32) {
gtail.schedlink = 0
if sched.runqtail != 0 {
sched.runqtail.ptr().schedlink.set(ghead)
} else {
sched.runqhead.set(ghead)
}
sched.runqtail.set(gtail)
sched.runqsize += n
}
复制代码
runqput 方法归还执行完的 G,runq 定义是 runq [256]guintptr,有固定的长度,因此当前 P 里的待运行 G 超过 256 的时候说明过多了,则执行 runqputslow 方法把一半 G 扔给全局 G 链表,globrunqputbatch 连接全局链表的头尾指针。
但可能别的 P 里面并没有超过 256,就不会放到全局 G 链表里,甚至可能一直维持在不到 256 个。这就借助第 2 个途径了:
第 2 种途径实现如下:
// runtime/proc.go
// 从其它地方获取G
func findrunnable() (gp *g, inheritTime bool) {
......
// 尝试4次从别的P偷
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
// 在这里开始针对P进行偷取操作
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
}
复制代码
从别的 P 里面"偷取"一些 G 过来执行了。runqsteal 方法实现了"偷取"操作。
// runtime/proc.go
// 偷取P2一半到本地运行队列,失败则返回nil
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
t := _p_.runqtail
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
if n == 0 {
return nil
}
n--
// 返回尾部的一个G
gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
if n == 0 {
return gp
}
h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
if t-h+n >= uint32(len(_p_.runq)) {
throw("runqsteal: runq overflow")
}
atomic.Store(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
return gp
}
// 从P里获取一半的G,放到batch里
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
// 计算一半的数量
h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := atomic.Load(&_p_.runqtail) // load-acquire, synchronize with the producer
n := t - h
n = n - n/2
......
// 将偷到的任务转移到本地P队列里
for i := uint32(0); i < n; i++ {
g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
if atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return n
}
}
}
复制代码
上面可以看出从别的 P 里面偷(steal)了一半,这样就足够运行了。有了“偷取”操作也就充分利用了多线程的资源。
评论 3 条评论