go并发调度模型如上图
M指的是Machine,一个M直接关联了一个线程。
P指的是Processor,代表了M所需的上下文环境,也是处理用户级代码逻辑的处理器。
G指的是Goroutine,其实本质上也是一种轻量级的线程。
⾸先是 Processor(简称 P),其作⽤类似 CPU 核,⽤来控制可同时并发执⾏的任务数量。每个⼯作线程都必须绑定⼀个有效 P 才被允许执⾏任务,否则只能休眠,直到有空闲 P 时被唤醒。P 还为线程提供执⾏资源,⽐如对象分配内存、本地任务队列等。线程独享所绑定的 P 资源,可在⽆锁状态下执⾏⾼效操作。
进程内的⼀切都在以G⽅式运⾏,包括运⾏时相关服务,以及main.main ⼊口函数。需要指出,G 并⾮执⾏体,它仅仅保存并发任务状态,为任务执⾏提供所需栈内存空间。G 任务创建后被放置在 P 本地队列或全局队列,等待⼯作线程调度执⾏。
实际执⾏体是系统线程(简称 M),它和 P 绑定,以调度循环⽅式不停执⾏ G 并发任务。M 通过修改寄存器,将执⾏栈指向 G ⾃带栈内存,并在此空间内分配堆栈帧,执⾏任函数。当需要中途切换时,只要将相关寄存器值保存回 G 空间即可维持状态,任何 M 都可据此恢复执⾏。线程仅负责执⾏,不再持有状态,这是并发任务跨线程调度,实现多路复⽤的根本所在。
G自己提供内存栈在M上执行
P保存P执行过程中的数据行,当G被暂停时,SP,SC等寄存器信息会保存在G.sched中,当G被唤醒继续执行时,从之前暂停的位置继续执行,因为G提供内存栈,并记录了上次执行到的位置,G数量很多,P相对较少,在垃圾回收的时候方便定位
P中有一个对列保存G的指针,其实就是一个256个元素的数组,通过两个变量指向对首和对尾,所以这个队列是会出现满的情况的,满了新加的G就只能放到全局队列中
type g struct { stack stack //栈,两个能容纳任何变量地址的变量 stackguard0 uintptr // offset known to liblink stackguard1 uintptr // offset known to liblink _panic *_panic // innermost panic - offset known to liblink _defer *_defer // innermost defer m *m // current m; offset known to arm liblink sched gobuf //存放g上下文信息,g被停止调度时,会将上线文信息存在这里,唤醒后可继续调度 syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc stktopsp uintptr // expected sp at top of stack, to check in traceback param unsafe.Pointer // passed parameter on wakeup atomicstatus uint32 stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus goid int64 //就像线程有id,g也有id waitsince int64 // approx time when the g become blocked waitreason string // if status==Gwaiting schedlink guintptr //指向另一个G,全局G就是通过这个字段连在一起的 preempt bool // preemption signal, duplicates stackguard0 = stackpreempt paniconfault bool // panic (instead of crash) on unexpected fault address preemptscan bool // preempted g does scan for gc gcscandone bool // g has scanned stack; protected by _Gscan bit in status gcscanvalid bool // false at start of gc cycle, true if G has not run since last scan; TODO: remove? throwsplit bool // must not split stack raceignore int8 // ignore race detection events sysblocktraced bool // StartTrace has emitted EvGoInSyscall about this goroutine sysexitticks int64 // cputicks when syscall has returned (for tracing) traceseq uint64 // trace event sequencer tracelastp puintptr // last P emitted an event for this goroutine lockedm *m sig uint32 writebuf []byte sigcode0 uintptr sigcode1 uintptr sigpc uintptr gopc uintptr // pc of go statement that created this goroutine startpc uintptr // 被执行的函数 racectx uintptr waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order cgoCtxt []uintptr // cgo traceback context labels unsafe.Pointer // profiler labels timer *timer // cached timer for time.Sleep gcAssistBytes int64 }
go func()到底做了什么?
对应函数runtime.newproc
1:从执行当前方法的G所在P的空闲G列表中取一个G,如果没有就从全局list中取一个,毕竟G还是经常使用,用完的G并不是马上释放,而是放回P的空闲列表中反复利用,如果还是没有空闲的G,就new一个malg(2048),G的栈大小为2K
2:如果有参数会将参数拷贝到G的栈上,将G状态改成可运行状态
3:如果P的G队列没满,将G加入队尾
4:如果P的G队列满了,就取出G队列的前面一半+当前G,共129个G加入全局队列
加入队列后,等待被调度
全局队列G存取
G本身有个字段schedlink指向另一个G,天生就是链表的一个节点,全局队列其实就是两个指针,一个指向队首,一个指向队尾,队尾的存在就是方便入队列
入全局队列:前面说过将P中的一半+1个G(129)加入全局队列,并不是一个个入队列,而是将这个129个G的首接入全局队列的尾,将全局队列的尾改成这129个G的尾
出全局队列:当系统开始调度的时候,会从P本地G队列取一个可用G执行,如果没有,则从全局队列中取,最多取128个,返回第一个用于执行,剩余的存入本地G队列中,毕竟操作本地队列不用加锁,操作全局队列需要加锁
findrunnable查找可执行的G
1:本地队列:从M对应的P的G队列中找(runqget),队列不为空,返回对列首个元素,对首指针指向下一个元素,当对首和对尾指向同一个元素时表示队列为空,访问本地队列中的G不需要加锁
2:全局队列:从全局队列中找(globrunqget),从全局队列中取G不是一次取一个,毕竟访问全局队列是要加锁的,所以全局队列有多少取多少,最多取P队列容量一半128个,将这些G存入P的G队列中
3:⽹络任务(netpoll)
4:从其他P任务队列取,拿一半
所有目的就是多核齐心协力以最快的速度完成任务,总不能出现某个P的本地队列还有多个人,其他P都在睡大觉吧,最后如果还是没找到一个可用的G,那就大家一起睡大觉,等着被叫醒
type p struct { lock mutex id int32 status uint32 // one of pidle/prunning/... link puintptr schedtick uint32 // incremented on every scheduler call syscalltick uint32 // incremented on every system call sysmontick sysmontick // last tick observed by sysmon m muintptr // back-link to associated m (nil if idle) mcache *mcache //方便小对象的分配,一个p一个,不需要加锁 racectx uintptr deferpool [5][]*_defer // pool of available defer structs of different sizes (see panic.go) deferpoolbuf [5][32]*_defer // Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen. goidcache uint64 goidcacheend uint64 // Queue of runnable goroutines. Accessed without lock. runqhead uint32 //队头 runqtail uint32 //队尾 runq [256]guintptr //G循环队列 runnext guintptr //高优先级的G,会先执行 // Available G's (status == Gdead) gfree *g //空闲G列表 gfreecnt int32 //空闲G数量 sudogcache []*sudog sudogbuf [128]*sudog tracebuf traceBufPtr // traceSweep indicates the sweep events should be traced. // This is used to defer the sweep start event until a span // has actually been swept. traceSweep bool // traceSwept and traceReclaimed track the number of bytes // swept and reclaimed by sweeping in the current sweep loop. traceSwept, traceReclaimed uintptr palloc persistentAlloc // per-P to avoid mutex // Per-P GC state gcAssistTime int64 // Nanoseconds in assistAlloc gcBgMarkWorker guintptr gcMarkWorkerMode gcMarkWorkerMode gcw gcWork runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point pad [sys.CacheLineSize]byte }
永远不会退出的调度(schedule)
当一个G执行完成后,会继续调用调度函数schedule,死循环就产生了
// goexit continuation on g0. func goexit0(gp *g) { _g_ := getg() casgstatus(gp, _Grunning, _Gdead) dropg() _g_.m.locked = 0 gfput(_g_.m.p.ptr(), gp) schedule() }
整体执行流程
mstart() => schedule() => findrunnable() => execute() => func() => goexit() => schedule()
M就绪 =>调度 => 查找可调度G => 执行G => 具体方法 => 执行完成 => 继续调度
入口函数是 _rt0_amd64_linux,需要说明的是,不同平台的入口函数名称会有所不同,该方法会调用runtime.rt0_go汇编。
rt0_go 做了大量的初始化工作,runtime.args 读取命令行参数、runtime.osinit 读取 CPU 数目,runtime.schedinit初始化Processor数目,最大的Machine数目等等。
除此之外,我们还看到了两个奇怪的 g0 和 m0 变量。m0 Machine 代表着当前初始化线程,而 g0 代表着初始化线程 m0 的 system stack,似乎还缺一个 p0 ?
实际上所有的 Processor 都会放到 allp 里。runtime.schedinit 会在调用 procresize 时为 m0 分配上 allp[0] 。所以到目前为止,初始化线程运行模式是符合上文提到的 G/P/M 模型的。
大量的初始化工作做完之后,会调用 runtime.newproc 为 mainPC 方法生成一个 Goroutine。 虽然 mainPC 并不是我们平时写的那个 main 函数,但是它会调用我们写的 main 函数,所以 main 函数是会以 Goroutine 的形式运行。
TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8 LEAQ 8(SP), SI // argv MOVQ 0(SP), DI // argc MOVQ $main(SB), AX JMP AX TEXT main(SB),NOSPLIT,$-8 MOVQ $runtime·rt0_go(SB), AX JMP AX TEXT runtime·rt0_go(SB),NOSPLIT,$0 LEAQ runtime·g0(SB), CX MOVQ CX, g(BX) LEAQ runtime·m0(SB), AX // save m->g0 = g0 MOVQ CX, m_g0(AX) // save m0 to g0->m MOVQ AX, g_m(CX) CALL runtime·args(SB) CALL runtime·osinit(SB) //获取cpu数量,页大小 CALL runtime·schedinit(SB) //调度初始化 // create a new goroutine to start program MOVQ $runtime·mainPC(SB), AX // entry,执行runtime.main CALL runtime·newproc(SB) // start this M CALL runtime·mstart(SB) MOVL $0xf1, 0xf1 // crash RET DATA runtime·mainPC+0(SB)/8,$runtime·main(SB) GLOBL runtime·mainPC(SB),RODATA,$8 package runtime // The main goroutine. func main() { // Allow newproc to start new Ms. mainStarted = true gcenable() fn := main_init // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime fn() fn = main_main // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime fn() exit(0) }
参考
https://github.com/golang/go (go源码)
https://github.com/qyuhen/book (雨痕,内容很棒很全面,已出书)