1. 創建go的入口函數
// Create a new g running fn.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
func newproc(fn *funcval) {gp := getg()pc := sys.GetCallerPC()systemstack(func() {newg := newproc1(fn, gp, pc, false, waitReasonZero)pp := getg().m.p.ptr()runqput(pp, newg, true)//true代表放置為runnext優先級g;內部隨機數使得優先級g降級為普通g;runnext位置僅1,cas進行新舊替換;普通g放置在p隊列尾if mainStarted {wakep() //內部使用cas 和 判斷自旋m數量 來保證不過度創建m 和 避免搶占}})
}
2.schedule函數
// 調度器的一輪操作:查找可運行的goroutine并執行它
// 該函數不會返回
func schedule() {mp := getg().m// 如果當前M持有鎖,則拋出錯誤(鎖的存在可能破壞調度邏輯)if mp.locks != 0 {throw("schedule: holding locks")}// 如果當前M被鎖定到某個goroutine,需要先釋放P再執行該Gif mp.lockedg != 0 {stoplockedm()execute(mp.lockedg.ptr(), false) // 該函數不會返回}// 我們不能從執行cgo調用的G中調度新的G,因為cgo調用正在使用M的g0棧if mp.incgo {throw("schedule: in cgo")}top:pp := mp.p.ptr()pp.preempt = false// 安全性檢查:如果當前M處于自旋狀態,本地運行隊列應該為空// 在調用checkTimers之前執行此檢查,因為checkTimers可能會調用goready// 將就緒的G放入本地運行隊列if mp.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {throw("schedule: spinning with local work")}// 找到可運行的G(findRunnable會阻塞直到有工作可用)gp, inheritTime, tryWakeP := findRunnable()// findRunnable可能收集了allp快照。快照僅在findRunnable內部需要,這里清除它// 以便GC可以回收該slicemp.clearAllpSnapshot()// 如果即將調度一個非普通G(如GCworker或tracereader),需要喚醒一個P(如果有的話)if tryWakeP {wakep()}// 如果G被鎖定到某個M,則將當前P交給該鎖定的M,然后阻塞等待新的Pif gp.lockedm != 0 {startlockedm(gp)goto top}// 執行G(該函數不會返回)execute(gp, inheritTime)
}
3.schedule中的findrunnable函數
// 查找可運行的goroutine執行
// 優先嘗試從其他P竊取工作,或從本地/全局隊列獲取,或輪詢網絡
// tryWakeP表示返回的goroutine是非正常的(如GC工作線程、追蹤讀取器),調用者需要嘗試喚醒P
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {mp := getg().m// 此處與handoffp中的條件需保持一致:如果findrunnable會返回可運行的G,handoffp必須啟動一個M
top:// 可能已收集allp快照。快照僅在每次循環迭代時需要。清空快照以便GC回收切片mp.clearAllpSnapshot()pp := mp.p.ptr()// 如果處于gcwaitting狀態 將當前m暫停 重新進行查找if sched.gcwaiting.Load() {gcstopm()goto top}if pp.runSafePointFn != 0 {runSafePointFn()}// now和pollUntil為后續工作竊取保存,可能竊取定時器// 在now到執行工作竊取期間不能阻塞,以確保這些數值相關now, pollUntil, _ := pp.timers.check(0, nil)// 嘗試調度追蹤讀取器if traceEnabled() || traceShuttingDown() {gp := traceReader()if gp != nil {trace := traceAcquire()casgstatus(gp, _Gwaiting, _Grunnable)if trace.ok() {trace.GoUnpark(gp, 0)traceRelease(trace)}return gp, false, true}}// 嘗試調度GC工作線程if gcBlackenEnabled != 0 {gp, tnow := gcController.findRunnableGCWorker(pp, now)if gp != nil {return gp, false, true}now = tnow}// 偶爾檢查全局可運行隊列以確保公平性// 否則兩個goroutine可能通過不斷重啟彼此完全占用本地隊列if pp.schedtick%61 == 0 && !sched.runq.empty() {lock(&sched.lock)gp := globrunqget()unlock(&sched.lock)if gp != nil {return gp, false, false}}// 喚醒終結器Gif fingStatus.Load()&(fingWait|fingWake) == fingWait|fingWake {if gp := wakefing(); gp != nil {ready(gp, 0, true)}}// 喚醒一個或多個清理Gif gcCleanups.needsWake() {gcCleanups.wake()}if *cgo_yield != nil {asmcgocall(*cgo_yield, nil)}// 本地運行隊列if gp, inheritTime := runqget(pp); gp != nil {return gp, inheritTime, false}// 全局運行隊列if !sched.runq.empty() {lock(&sched.lock)gp, q := globrunqgetbatch(int32(len(pp.runq)) / 2)unlock(&sched.lock)if gp != nil {if runqputbatch(pp, &q); !q.empty() {throw("Couldn't put Gs into empty local runq")}return gp, false, false}}// 網絡輪詢// 該netpoll是工作竊取前的優化操作// 如果沒有等待者或線程已阻塞在netpoll中,可以安全跳過// 若與阻塞線程存在邏輯競爭(如已返回但未設置lastpoll),后續仍會處理// 為避免多核機器內核爭用,每次僅允許一個線程進行輪詢if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 && sched.pollingNet.Swap(1) == 0 {list, delta := netpoll(0)sched.pollingNet.Store(0)if !list.empty() { // 非阻塞gp := list.pop()injectglist(&list)netpollAdjustWaiters(delta)trace := traceAcquire()casgstatus(gp, _Gwaiting, _Grunnable)if trace.ok() {trace.GoUnpark(gp, 0)traceRelease(trace)}return gp, false, false}}// 自旋線程:從其他P竊取工作// 自旋線程數量限制為忙碌P數量的一半// 這是為了防止GOMAXPROCS>>1但程序并行度低時的CPU過載if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {if !mp.spinning {mp.becomeSpinning()}gp, inheritTime, tnow, w, newWork := stealWork(now)if gp != nil {// 成功竊取return gp, inheritTime, false}if newWork {// 可能有新的定時器或GC工作,重啟循環goto top}now = tnowif w != 0 && (pollUntil == 0 || w < pollUntil) {// 更早的定時器需要等待pollUntil = w}}// 沒有工作可做// 如果處于GC標記階段且有安全掃描/染色對象的工作,則運行空閑時間標記而非放棄Pif gcBlackenEnabled != 0 && gcMarkWorkAvailable(pp) && gcController.addIdleMarkWorker() {node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())if node != nil {pp.gcMarkWorkerMode = gcMarkWorkerIdleModegp := node.gp.ptr()trace := traceAcquire()casgstatus(gp, _Gwaiting, _Grunnable)if trace.ok() {trace.GoUnpark(gp, 0)traceRelease(trace)}return gp, false, false}gcController.removeIdleMarkWorker()}// WASM平臺專用邏輯// 如果回調返回且沒有其他goroutine喚醒,則喚醒事件處理goroutine// 該goroutine會暫停執行直到回調被觸發gp, otherReady := beforeIdle(now, pollUntil)if gp != nil {trace := traceAcquire()casgstatus(gp, _Gwaiting, _Grunnable)if trace.ok() {trace.GoUnpark(gp, 0)traceRelease(trace)}return gp, false, false}if otherReady {goto top}// 釋放P前獲取allp快照,該切片可能在不再阻塞安全點時被修改// 不需要快照內容因為直到cap(allp)都是不可變的// 通過mp.clearAllpSnapshot(在schedule中)和每次循環迭代后清除快照allpSnapshot := mp.snapshotAllp()// 同時快照掩碼。值變化可以接受,但長度不能在我們處理時變化idlepMaskSnapshot := idlepMasktimerpMaskSnapshot := timerpMask// 釋放P并阻塞lock(&sched.lock)if sched.gcwaiting.Load() || pp.runSafePointFn != 0 {unlock(&sched.lock)goto top}if !sched.runq.empty() {gp, q := globrunqgetbatch(int32(len(pp.runq)) / 2)unlock(&sched.lock)if gp == nil {throw("global runq empty with non-zero runqsize")}if runqputbatch(pp, &q); !q.empty() {throw("Couldn't put Gs into empty local runq")}return gp, false, false}if !mp.spinning && sched.needspinning.Load() == 1 {// 參考下方"Delicate dance"注釋mp.becomeSpinning()unlock(&sched.lock)goto top}if releasep() != pp {throw("findrunnable: wrong p")}now = pidleput(pp, now)unlock(&sched.lock)// 精密的舞蹈:線程從自旋狀態轉為非自旋狀態時,可能與新工作提交并發// 必須先減少nmspinning計數,再通過StoreLoad內存屏障檢查所有源// 若順序顛倒,其他線程可能在我們檢查完所有源后提交工作但之前已減少nmspinning// 導致無人喚醒線程執行工作//// 適用于以下工作源:// * 加入全局或P本地運行隊列的goroutine// * P本地定時器堆的新/修改定時器// * 空閑優先級的GC工作(除非golang.org/issue/19112)//// 如果發現新工作,需要恢復m.spinning狀態以喚醒新工作線程// (因為可能有多個饑餓的goroutine)//// 但若發現新工作后又觀察到沒有空閑P(在此處或resetspinning中),則存在問題// 我們可能與上方非自旋M的釋放P并發競爭,導致P進入空閑狀態// 這會丟失工作守恒(空閑P時仍有可運行工作),極端情況下可能導致死鎖//// 通過sched.needspinning與非自旋M同步// 當非自旋M準備釋放P時,若發現needspinning被設置則中止釋放并轉為自旋// 若沒有并發競爭且系統完全負載,則無需自旋線程,下一個自然轉為自旋的線程會清除標志// 另見文件頂部的"Worker thread parking/unparking"注釋wasSpinning := mp.spinningif mp.spinning {mp.spinning = falseif sched.nmspinning.Add(-1) < 0 {throw("findrunnable: negative nmspinning")}// 注意正確性要求:只有最后一個從自旋轉為非自旋的線程需要重新檢查// 但運行時存在一些nmspinning的臨時增加未通過此路徑減少的情況// 因此必須保守地對所有自旋線程執行檢查// 參考https://go.dev/issue/43997// 再次檢查全局和P運行隊列lock(&sched.lock)if !sched.runq.empty() {pp, _ := pidlegetSpinning(0)if pp != nil {gp, q := globrunqgetbatch(int32(len(pp.runq)) / 2)unlock(&sched.lock)if gp == nil {throw("global runq empty with non-zero runqsize")}if runqputbatch(pp, &q); !q.empty() {throw("Couldn't put Gs into empty local runq")}acquirep(pp)mp.becomeSpinning()return gp, false, false}}unlock(&sched.lock)pp := checkRunqsNoP(allpSnapshot, idlepMaskSnapshot)if pp != nil {acquirep(pp)mp.becomeSpinning()goto top}// 再次檢查空閑優先級GC工作pp, gp := checkIdleGCNoP()if pp != nil {acquirep(pp)mp.becomeSpinning()// 運行空閑工作線程pp.gcMarkWorkerMode = gcMarkWorkerIdleModetrace := traceAcquire()casgstatus(gp, _Gwaiting, _Grunnable)if trace.ok() {trace.GoUnpark(gp, 0)traceRelease(trace)}return gp, false, false}// 最后檢查定時器創建/過期是否與自旋狀態轉換并發// 注意此處不能使用checkTimers因為它可能分配內存,而我們沒有活動P時不允許分配pollUntil = checkTimersNoP(allpSnapshot, timerpMaskSnapshot, pollUntil)}// 此時不再需要allp快照,但沒有P時不能清除(需寫屏障)// 輪詢網絡直到下一個定時器if netpollinited() && (netpollAnyWaiters() || pollUntil != 0) && sched.lastpoll.Swap(0) != 0 {sched.pollUntil.Store(pollUntil)if mp.p != 0 {throw("findrunnable: netpoll with p")}if mp.spinning {throw("findrunnable: netpoll with spinning")}delay := int64(-1)if pollUntil != 0 {if now == 0 {now = nanotime()}delay = pollUntil - nowif delay < 0 {delay = 0}}if faketime != 0 {// 使用假時間時直接輪詢delay = 0}list, delta := netpoll(delay) // 阻塞直到有新工作// 刷新時間戳(可能阻塞后)now = nanotime()sched.pollUntil.Store(0)sched.lastpoll.Store(now)if faketime != 0 && list.empty() {// 使用假時間且無就緒工作時停止M// 當所有M停止時,checkdead會調用timejumpstopm()goto top}lock(&sched.lock)pp, _ := pidleget(now)unlock(&sched.lock)if pp == nil {injectglist(&list)netpollAdjustWaiters(delta)} else {acquirep(pp)if !list.empty() {gp := list.pop()injectglist(&list)netpollAdjustWaiters(delta)trace := traceAcquire()casgstatus(gp, _Gwaiting, _Grunnable)if trace.ok() {trace.GoUnpark(gp, 0)traceRelease(trace)}return gp, false, false}if wasSpinning {mp.becomeSpinning()}goto top}} else if pollUntil != 0 && netpollinited() {pollerPollUntil := sched.pollUntil.Load()if pollerPollUntil == 0 || pollerPollUntil > pollUntil {netpollBreak()}}stopm()goto top
}
3.1 適當檢查全局隊列策略
// 偶爾檢查全局可運行隊列以確保公平性// 否則兩個goroutine可能通過不斷重啟彼此完全占用本地隊列if pp.schedtick%61 == 0 && !sched.runq.empty() {lock(&sched.lock)gp := globrunqget()unlock(&sched.lock)if gp != nil {return gp, false, false}}// local runqif gp, inheritTime := runqget(pp); gp != nil {return gp, inheritTime, false}// global runqif !sched.runq.empty() {lock(&sched.lock)gp, q := globrunqgetbatch(int32(len(pp.runq)) / 2)unlock(&sched.lock)if gp != nil {if runqputbatch(pp, &q); !q.empty() {throw("Couldn't put Gs into empty local runq")}return gp, false, false}}
????????這是findRunnable函數的一部分,其中檢查全局隊列,每61次檢查全局隊列,并取一個g,日常使用本地隊列獲取g。
定次數檢查全局隊列的狀態轉換圖
轉換前
轉換后
檢查本地隊列的狀態轉換圖
// 從本地可運行隊列獲取Goroutine
// 如果 inheritTime 為 true,gp 應該繼承當前時間片中剩余的時間
// 否則,它應該開始一個新的時間片
// 該函數僅由當前P(邏輯處理器)的擁有者執行
func runqget(pp *p) (gp *g, inheritTime bool) {// 如果存在 runnext,則優先獲取該Goroutinenext := pp.runnext// 如果 runnext 不為0且CAS(比較并交換)操作成功,說明該Goroutine未被其它P搶占// 注意:若CAS失敗,可能是其它P將runnext置為0,無需重試// 因為只有當前P能將runnext設置為非0值if next != 0 && pp.runnext.cas(next, 0) {return next.ptr(), true}// 無限循環嘗試從環形隊列中獲取Goroutinefor {// 原子讀取隊列頭指針(load-acquire語義:與其它消費者同步)h := atomic.LoadAcq(&pp.runqhead)t := pp.runqtail// 如果隊列為空(尾指針等于頭指針),返回nilif t == h {return nil, false}// 計算Goroutine在環形隊列中的索引// 并獲取對應的Goroutine指針gp = pp.runq[h%uint32(len(pp.runq))].ptr()// 原子更新頭指針(cas-release語義:提交消耗,保證操作可見性)// 若成功,則返回獲取到的Goroutineif atomic.CasRel(&pp.runqhead, h, h+1) {return gp, false}}
}
????????隊列使用環形隊列的方式進行存儲,首先檢查runnext優先級g是否存在(runnext只能被當前p置為0);然后檢查本地隊列,獲取其中的g,并且返回。
轉換前
轉換后
檢查全局隊列狀態轉移圖
// Try get a batch of G's from the global runnable queue.
// sched.lock must be held.
func globrunqgetbatch(n int32) (gp *g, q gQueue) {assertLockHeld(&sched.lock) //必須擁有全局隊列鎖if sched.runq.size == 0 {return}n = min(n, sched.runq.size, sched.runq.size/gomaxprocs+1)gp = sched.runq.pop()n--for ; n > 0; n-- {gp1 := sched.runq.pop()q.pushBack(gp1)}return
}
執行到本函數時,前提是上一步驟本地隊列為空的情況下,將從全局隊列中取g,g的數量為給定的n(輸入為本地隊列長度的一半),全局隊列長度,全局隊列長度/最大設置p數量 + 1的最小值。
// runqputbatch tries to put all the G's on q on the local runnable queue.
// If the local runq is full the input queue still contains unqueued Gs.
// Executed only by the owner P.
func runqputbatch(pp *p, q *gQueue) {if q.empty() {return}h := atomic.LoadAcq(&pp.runqhead)t := pp.runqtailn := uint32(0)for !q.empty() && t-h < uint32(len(pp.runq)) {gp := q.pop()pp.runq[t%uint32(len(pp.runq))].set(gp)t++n++}// 隨機化處理 打亂g加入隊列的順序 防止饑餓等問題if randomizeScheduler {// 計算偏移量off := func(o uint32) uint32 {return (pp.runqtail + o) % uint32(len(pp.runq))}for i := uint32(1); i < n; i++ {j := cheaprandn(i + 1)pp.runq[off(i)], pp.runq[off(j)] = pp.runq[off(j)], pp.runq[off(i)]}}atomic.StoreRel(&pp.runqtail, t)return
}
將從全局隊列獲取的g,盡可能的存放入本地p隊列當中。
轉換前
轉換后
3.2 網絡輪詢的處理
// 網絡輪詢
// 此netpoll僅作為優化措施,用于在工作竊取之前嘗試獲取網絡事件。
// 如果沒有等待者或線程已阻塞在netpoll中,可以安全地跳過此步驟。
// 當與阻塞線程存在邏輯競爭時(例如該線程已從netpoll返回但尚未設置lastpoll),
// 本線程仍會執行阻塞式netpoll操作。
// 為避免多核機器上的內核爭用,我們確保同一時間只有一個線程在進行輪詢。
if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 && sched.pollingNet.Swap(1) == 0 {list, delta := netpoll(0) // 非阻塞式輪詢sched.pollingNet.Store(0) // 重置輪詢狀態標志if !list.empty() {gp := list.pop() // 獲取等待的goroutineinjectglist(&list) // 將goroutine列表注入運行隊列netpollAdjustWaiters(delta) // 調整等待計數trace := traceAcquire()// 原子狀態轉換:從等待狀態轉為可運行狀態casgstatus(gp, _Gwaiting, _Grunnable)if trace.ok() {trace.GoUnpark(gp, 0) // 跟蹤goroutine解除阻塞traceRelease(trace) // 釋放跟蹤資源}return gp, false, false // 返回可運行的goroutine}
}func injectglist(glist *gList) {if glist.empty() {return}lock(&sched.lock)var n intfor n = 0; !glist.empty(); n++ {gp := glist.pop()casgstatus(gp, _Gwaiting, _Grunnable)globrunqput(gp) // 其余goroutine放入全局隊列}unlock(&sched.lock)// 嘗試啟動新的M來處理這些goroutinefor ; n != 0 && sched.npidle.Load() != 0; n-- {startm(nil, false, false)}
}
狀態轉移前
狀態轉移后
3.3 竊取其他p的本地g操作
// 自旋的M:從其他P偷取工作
//
// 限制自旋M的數量不超過忙碌P的一半。
// 這是為了防止當GOMAXPROCS遠大于1但程序并行度較低時
// 出現過高的CPU消耗。
if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {if !mp.spinning {mp.becomeSpinning()}gp, inheritTime, tnow, w, newWork := stealWork(now)if gp != nil {// 成功偷取工作return gp, inheritTime, false}if newWork {// 可能有新的定時器或GC工作;需要重新啟動以發現goto top}now = tnowif w != 0 && (pollUntil == 0 || w < pollUntil) {// 有更早的定時器需要等待pollUntil = w}
}
核心操作函數
????????在本地p隊列為空,隨機獲取其他p的g(最大偷取數量為目標p隊列長度的一半),持有目標p的runq.lock;在本地隊列p為空且定時器堆也為空,則獲取其他p的定時器來獲取關聯g,持有目標p的mu。
// stealWork 嘗試從任何P中偷取可運行的goroutine或定時器
//
// 如果newWork為true,表示可能有新工作被就緒
//
// 如果now不為0,則表示傳入的當前時間。stealWork返回傳入的時間或
// 當now為0時返回當前時間
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {pp := getg().m.p.ptr()ranTimer := falseconst stealTries = 4for i := 0; i < stealTries; i++ {stealTimersOrRunNextG := i == stealTries-1for enum := stealOrder.start(cheaprand()); !enum.done(); enum.next() {if sched.gcwaiting.Load() {// GC工作可能已就緒return nil, false, now, pollUntil, true}p2 := allp[enum.position()]if pp == p2 {continue}// 從p2偷取定時器。這個checkTimers調用是唯一可能持有// 其他P的定時器鎖的地方。我們在這個循環的最后階段檢查// runnext之前先檢查定時器,因為從其他P的runnext偷取// 應該是最后的選擇,如果存在可偷取的定時器優先處理//// 我們只在其中一個偷取循環中檢查定時器,因為now的值// 在這個循環中不會變化,多次檢查相同時間點的定時器// 可能浪費性能//// timerpMask告訴我們P是否可能擁有定時器。如果P不可能// 擁有定時器,則無需檢查if stealTimersOrRunNextG && timerpMask.read(enum.position()) {tnow, w, ran := p2.timers.check(now, nil)now = tnowif w != 0 && (pollUntil == 0 || w < pollUntil) {pollUntil = w}if ran {// 運行定時器可能使任意數量的G就緒// 并將它們添加到本P的本地運行隊列中// 這會破壞runqsteal的假設(運行隊列有足夠空間)// 所以現在需要檢查本P本地隊列是否有G可運行if gp, inheritTime := runqget(pp); gp != nil {return gp, inheritTime, now, pollUntil, ranTimer}ranTimer = true}}// 如果p2處于空閑狀態,無需嘗試偷取if !idlepMask.read(enum.position()) {if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {return gp, false, now, pollUntil, ranTimer}}}}// 未找到可偷取的goroutine。不管怎樣,運行定時器可能使// 某些goroutine就緒。指示需要等待的下一個定時器return nil, false, now, pollUntil, ranTimer
}
狀態轉移前
狀態轉移后
????????最后,當所有的工作都找不到可用的g來運行,就會休眠m,等待喚醒。
3.4 execute喚醒
// 將gp調度到當前M上運行
// 如果inheritTime為true,則gp繼承當前時間片的剩余時間,否則開始新的時間片
// 此函數不會返回// 允許寫屏障操作,因為該函數在多個位置獲取P后立即調用
//go:yeswritebarrierrec
func execute(gp *g, inheritTime bool) {mp := getg().mif goroutineProfile.active {// 確保gp的堆棧信息已記錄到goroutine profile中// 記錄時保持goroutine profiler首次暫停世界時的狀態tryRecordGoroutineProfile(gp, nil, osyield)}// 在進入_Grunning狀態前為運行中的G綁定Mmp.curg = gpgp.m = mpgp.syncSafePoint = false // 清除可能由morestack設置的標志casgstatus(gp, _Grunnable, _Grunning) // 原子操作修改goroutine狀態gp.waitsince = 0 // 清除等待時間戳gp.preempt = false // 清除搶占標志gp.stackguard0 = gp.stack.lo + stackGuard // 設置棧保護指針if !inheritTime {mp.p.ptr().schedtick++ // 時間片不繼承時增加調度計數器}// 檢查是否需要開啟/關閉CPU profilerhz := sched.profilehzif mp.profilehz != hz {setThreadCPUProfiler(hz) // 設置線程CPU分析頻率}trace := traceAcquire()if trace.ok() {trace.GoStart() // 啟動追蹤事件traceRelease(trace) // 釋放追蹤資源}gogo(&gp.sched) // 進入goroutine執行入口
}
狀態轉移前
狀態轉移
3.5 gosched 主動讓出
// Gosched讓出處理器,允許其他goroutine運行。該函數不會掛起當前goroutine,
// 因此當前goroutine會在后續自動恢復執行。
//
//go:nosplit // 編譯器指令:禁止函數分割棧(保持原棧結構執行)
func Gosched() {// 檢查是否有定時器需要處理(如超時事件)checkTimeouts()// 調用系統調用處理函數,將控制權交還調度器// mcall會切換到系統棧執行gosched_m函數mcall(gosched_m)
}// goschedImpl函數讓出處理器,允許其他goroutine運行。preempted參數表示是否是被搶占的情況。
// 該函數不會掛起當前goroutine,執行會自動恢復。
func goschedImpl(gp *g, preempted bool) {// 獲取追蹤資源trace := traceAcquire()// 讀取當前goroutine狀態status := readgstatus(gp)// 驗證當前狀態是否為運行中狀態(排除掃描狀態)if status&^_Gscan != _Grunning {dumpgstatus(gp) // 打印狀態信息用于調試throw("bad g status") // 狀態異常時拋出錯誤}// 如果追蹤可用,記錄相關事件if trace.ok() {// 在狀態轉換前記錄追蹤事件,可能需要獲取堆棧信息// 但轉換后將不再擁有當前堆棧if preempted {trace.GoPreempt() // 搶占事件追蹤} else {trace.GoSched() // 主動讓出處理器的追蹤}}// 原子操作將當前goroutine狀態從運行中改為可運行casgstatus(gp, _Grunning, _Grunnable)// 釋放追蹤資源if trace.ok() {traceRelease(trace)}// 從當前M的綁定關系中解綁goroutinedropg()// 加鎖操作lock(&sched.lock)// 將當前goroutine放入全局運行隊列globrunqput(gp)// 解鎖操作unlock(&sched.lock)// 如果主程序已啟動,喚醒空閑的處理器if mainStarted {wakep()}// 調度器開始尋找下一個要運行的goroutineschedule()
}// Gosched函數在g0棧上的執行入口(被搶占后執行)
func gosched_m(gp *g) {// 調用核心實現,傳入preempted=false表示主動讓出而非被搶占goschedImpl(gp, false)
}
轉移前
轉移后
3.6 系統調用
// 當前goroutine g 即將進入系統調用
// 記錄其不再占用CPU的狀態
// 此函數僅由Go系統調用庫和cgo調用調用,不會被運行時的低層系統調用使用// entersyscall不能分割棧:保存操作必須使g->sched指向調用者的棧段,因為
// entersyscall將在立即返回后執行。在此期間g處于Gsyscall狀態,但g.sched字段
// 的結構可能不完整,不能讓GC觀察到這種不一致狀態// entersyscall調用的任何函數都不能分割棧
// 在活躍的系統調用期間,我們無法安全地移動棧,因為不知道uintptr參數中哪些
// 是實際的指針(指向棧內部)。實踐中,這意味著快速路徑必須使用無分割操作,
// 慢速路徑則需要通過systemstack在系統棧上執行更大操作// reentersyscall是cgo回調使用的入口點,用于恢復顯式保存的SP和PC
// 這在需要從調用棧更上層的函數調用exitsyscall時是必要的,因為g.syscallsp
// 必須始終指向有效的棧幀。下面的entersyscall是正常系統調用入口,從調用者獲取SP和PC//go:nosplit
func reentersyscall(pc, sp, bp uintptr) {trace := traceAcquire()gp := getg()// 禁用搶占,因為此時g處于Gsyscall狀態但g.sched字段可能不一致gp.m.locks++// entersyscall不能調用可能分割/擴展堆棧的函數(詳情見上方注釋)// 通過替換堆棧保護指針為會觸發堆棧檢查的值,并設置標志位讓newstack終止gp.stackguard0 = stackPreemptgp.throwsplit = true// 保留SP用于GC和追蹤回溯save(pc, sp, bp)gp.syscallsp = spgp.syscallpc = pcgp.syscallbp = bpcasgstatus(gp, _Grunning, _Gsyscall)if staticLockRanking {// 靜態鎖排序時,casgstatus可能調用systemstack并覆蓋g.schedsave(pc, sp, bp)}if gp.syscallsp < gp.stack.lo || gp.stack.hi < gp.syscallsp {systemstack(func() {print("entersyscall inconsistent sp ", hex(gp.syscallsp), " [", hex(gp.stack.lo), ",", hex(gp.stack.hi), "]\n")throw("entersyscall")})}if gp.syscallbp != 0 && gp.syscallbp < gp.stack.lo || gp.stack.hi < gp.syscallbp {systemstack(func() {print("entersyscall inconsistent bp ", hex(gp.syscallbp), " [", hex(gp.stack.lo), ",", hex(gp.stack.hi), "]\n")throw("entersyscall")})}if trace.ok() {systemstack(func() {trace.GoSysCall() // 記錄系統調用事件traceRelease(trace) // 釋放追蹤資源})// systemstack本身會覆蓋g.sched.{pc,sp},而我們可能需要這些信息// 當G真正被系統調用阻塞時save(pc, sp, bp)}if sched.sysmonwait.Load() {// 系統監控等待期間需要特殊處理systemstack(entersyscall_sysmon)save(pc, sp, bp)}if gp.m.p.ptr().runSafePointFn != 0 {// 在當前棧上執行runSafePointFn可能導致棧分割// 通過系統棧執行確保安全systemstack(runSafePointFn)save(pc, sp, bp)}// 記錄當前系統調用計數gp.m.syscalltick = gp.m.p.ptr().syscalltickpp := gp.m.p.ptr()pp.m = 0gp.m.oldp.set(pp)gp.m.p = 0atomic.Store(&pp.status, _Psyscall) // 更新P狀態為系統調用中if sched.gcwaiting.Load() {// 如果GC正在等待,需要特殊處理systemstack(entersyscall_gcwait)save(pc, sp, bp)}// 減少鎖計數器,允許搶占恢復gp.m.locks--
}
轉移前
轉移后
3.7 網絡阻塞
// 將當前goroutine置于等待狀態,并在系統棧上調用unlockf函數
//
// 如果unlockf返回false,則當前goroutine會被恢復執行
//
// unlockf不能訪問該G的棧,因為G可能在調用gopark和unlockf之間被移動到其他M
//
// 注意:由于unlockf是在將G置于等待狀態后調用的,調用時G可能已經被其他goroutine準備就緒
// 除非有外部同步機制阻止G被準備。如果unlockf返回false,必須保證G不能被外部準備
//
// reason參數說明goroutine被阻塞的原因,會在堆棧跟蹤和堆轉儲中顯示
// 原因應該保持唯一性和描述性,不要復用原因,應添加新原因
//
// gopark應該作為運行時內部實現細節
// 但廣泛使用的包通過linkname指令訪問它
// 羞恥堂成員包括:
// - gvisor.dev/gvisor
// - github.com/sagernet/gvisor
//
// 不要刪除或修改類型簽名(見go.dev/issue/67401)
//
//go:linkname gopark
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceReason traceBlockReason, traceskip int) {if reason != waitReasonSleep {checkTimeouts() // 兩個goroutine可能同時讓調度器繁忙,此時需要檢查超時}mp := acquirem() // 獲取當前Mgp := mp.curg // 當前M綁定的Gstatus := readgstatus(gp)// 驗證當前G狀態是否合法(運行中或掃描運行中)if status != _Grunning && status != _Gscanrunning {throw("gopark: bad g status")}mp.waitlock = lock // 保存等待鎖mp.waitunlockf = unlockf // 保存解鎖函數gp.waitreason = reason // 設置等待原因mp.waitTraceBlockReason = traceReason // 設置追蹤阻塞原因mp.waitTraceSkip = traceskip // 設置追蹤跳過層級releasem(mp) // 釋放當前M// 不能在此處執行可能導致G在多個M之間移動的操作mcall(park_m) // 調用系統調用處理函數
}
狀態轉移前
狀態轉移后
3.8 定時器操作
// time.Sleep函數的實現
//
// 該函數使當前goroutine休眠至少ns納秒
//
//go:linkname timeSleep time.Sleep
func timeSleep(ns int64) {if ns <= 0 {return}gp := getg() // 獲取當前goroutinet := gp.timer // 獲取當前goroutine的定時器if t == nil {// 如果沒有定時器則創建新定時器t = new(timer)t.init(goroutineReady, gp) // 初始化定時器回調為goroutineReadyif gp.bubble != nil { // 如果處于時間氣泡中t.isFake = true // 標記為虛擬定時器}gp.timer = t // 綁定定時器到當前goroutine}var now int64if bubble := gp.bubble; bubble != nil {// 如果處于時間氣泡中,使用氣泡內的時間戳now = bubble.now} else {// 否則獲取當前實際時間now = nanotime()}// 計算喚醒時間(當前時間+休眠時長)when := now + ns// 檢查溢出情況if when < 0 {when = maxWhen // 設置為最大時間值}gp.sleepWhen = when // 保存喚醒時間// 根據是否為虛擬定時器選擇不同處理方式if t.isFake {// 在協程內部調用定時器重置(因為處于時間氣泡中)// 不需要擔心定時器函數在協程掛起前執行,因為時間不會在掛起前推進resetForSleep(gp, nil)// 掛起當前協程,等待時間氣泡中的時間推進gopark(nil, nil, waitReasonSleep, traceBlockSleep, 1)} else {// 使用系統調度器進行定時器重置gopark(resetForSleep, nil, waitReasonSleep, traceBlockSleep, 1)}
}
定時器阻塞前
定時器喚醒
3.9 Goroutine正常結束
// Finishes execution of the current goroutine.
func goexit1() {if raceenabled {if gp := getg(); gp.bubble != nil {racereleasemergeg(gp, gp.bubble.raceaddr())}racegoend()}trace := traceAcquire()if trace.ok() {trace.GoEnd()traceRelease(trace)}mcall(goexit0)
}// goexit continuation on g0.
func goexit0(gp *g) {gdestroy(gp)schedule()
}
3.10?搶占式調度 - 信號/棧增長
func retake(now int64) uint32 {n := 0// 防止allp切片發生變化。這個鎖只有在暫停世界時才可能被競爭// This lock will be completely uncontended unless we're already stopping the world.lock(&allpLock)// 我們不能在循環中使用range遍歷allp,因為可能會// 臨時釋放allpLock。因此需要每次循環都重新獲取allp。for i := 0; i < len(allp); i++ {pp := allp[i]if pp == nil {// 這可能發生在procresize已經擴容// allp但尚未創建新的P時continue}pd := &pp.sysmonticks := pp.statussysretake := falseif s == _Prunning || s == _Psyscall {// 如果某個G在同一個schedtick上運行太久就搶占// 可能是單個長時間運行的goroutine,或通過// runnext運行的一系列goroutine共享的調度時間片t := int64(pp.schedtick)if int64(pd.schedtick) != t {pd.schedtick = uint32(t)pd.schedwhen = now} else if pd.schedwhen+forcePreemptNS <= now {preemptone(pp)// 如果是系統調用狀態,preemptone()可能失效// 因為此時M和P沒有綁定sysretake = true}}if s == _Psyscall {// 如果系統調用時間超過1個sysmon tick(至少20us)就回收P// 一方面我們不希望在沒有其他工作時回收P// 另一方面我們需要回收P以避免sysmon線程無法進入深度睡眠t := int64(pp.syscalltick)if !sysretake && int64(pd.syscalltick) != t {pd.syscalltick = uint32(t)pd.syscallwhen = nowcontinue}// 在CAS操作前需要減少空閑鎖定的M數量// 否則從系統調用返回的M可能增加nmidle并報告死鎖// (假裝有1個M在運行)unlock(&allpLock)incidlelocked(-1)trace := traceAcquire()if atomic.Cas(&pp.status, s, _Pidle) {if trace.ok() {trace.ProcSteal(pp, false)traceRelease(trace)}n++pp.syscalltick++handoffp(pp)} else if trace.ok() {traceRelease(trace)}incidlelocked(1)lock(&allpLock)}}unlock(&allpLock)return uint32(n)
}// 通知在處理器P上運行的goroutine停止
// 該函數是盡力而為的,可能會失敗或通知錯誤的goroutine
// 即使通知了正確的goroutine,如果它同時正在執行newstack操作,也可能忽略請求
// 無需持有任何鎖
// 返回true表示已發起搶占請求
// 實際搶占將在未來某個時刻發生,并通過gp->status不再是Grunning狀態來體現
func preemptone(pp *p) bool {mp := pp.m.ptr()if mp == nil || mp == getg().m {return false}gp := mp.curgif gp == nil || gp == mp.g0 {return false}gp.preempt = true// 每個goroutine中的調用都會檢查棧溢出// 通過比較當前棧指針和gp->stackguard0的值// 將gp->stackguard0設置為StackPreempt值// 可以將搶占請求合并到正常的棧溢出檢查流程中gp.stackguard0 = stackPreempt// 請求對這個P進行異步搶占if preemptMSupported && debug.asyncpreemptoff == 0 {pp.preempt = truepreemptM(mp)}return true
}
????????檢測是否有g占用過久cpu,通過信號機制強制進行調度切換。
搶占檢測
搶占執行
4.其他操作原因
4.1 cgo調用為什么不能搶占
(1)?棧隔離
- Go 調用 C 時,會創建一個新的?C 棧,與 Go 棧分離。
- C 代碼無法直接操作 Go 棧,反之亦然。
(2)?調度器的限制
- 搶占機制失效:Go 的搶占式調度依賴 g0 棧。在 cgo 調用期間,g0 棧被占用,調度器無法中斷當前 goroutine。
- M 與 P 的綁定:執行 cgo 調用的 M 會與 P(邏輯處理器)解綁,直到 C 調用返回。
(3)?內存和垃圾回收
- C 代碼不能被 Go 的垃圾回收器(GC)管理,因此需要手動處理內存(如?
C.free
?釋放 C 分配的內存)。 - 如果 C 代碼中分配的內存未釋放,可能導致內存泄漏。
4.2 為什么需要clearAllpSnapshot()
避免內存泄漏
allpSnapshot
是對所有P(邏輯處理器)的引用快照。若在循環中重復保留舊快照,會導致內存中存在大量不再使用的P引用。Go的GC無法回收被強引用占用的內存,長期積累可能引發內存泄漏。解除GC壓力
快照作為slice類型,其底層數組會持有P對象的引用。即使當前迭代結束后不再使用該快照,GC仍需跟蹤這些引用以判斷是否可回收。主動清零(clearAllpSnapshot()
)可立即解除引用關系,允許GC回收相關內存。確保數據一致性
P的狀態在運行時可能被動態修改(如遷移、銷毀)。若保留舊快照,后續操作可能基于過期數據,導致邏輯錯誤。每次迭代后清理快照,可強制下一次迭代重新獲取最新狀態。并發安全需求
在多線程環境中,未清理的快照可能被其他goroutine訪問。通過及時釋放快照,減少競態條件的風險,確保每次迭代的數據來源獨立且最新。
4.3 為什么stealwork里需要竊取定時器
????????每個p都有獨屬于自己的定時器和定時器隊列,結構體如下,會維護以g的過期時間作為值維護的最小堆,保證最早到期的g優先處理。同時每個p私有化定時器,也可以減少鎖的競爭。
// A timers is a per-P set of timers.
// timers 是每個P(邏輯處理器)的定時器集合
type timers struct {// mu protects timers; timers are per-P, but the scheduler can// access the timers of another P, so we have to lock.// mu 用于保護定時器;雖然定時器是每個P私有的,但調度器可能訪問其他P的定時器// 因此需要鎖來保證并發安全mu mutex// heap is the set of timers, ordered by heap[i].when.// Must hold lock to access.// heap 是定時器數組,按 heap[i].when(觸發時間)排序// 訪問此字段時必須持有鎖heaptimerWhen// len is an atomic copy of len(heap).// len 是 heap 長度的原子副本,用于并發讀取len atomic.Uint32// zombies is the number of timers in the heap// that are marked for removal.// zombies 是堆中標記為待移除的定時器數量zombies atomic.Int32// raceCtx is the race context used while executing timer functions.// raceCtx 是執行定時器函數時使用的競態檢測上下文raceCtx uintptr// minWhenHeap is the minimum heap[i].when value (= heap[0].when).// The wakeTime method uses minWhenHeap and minWhenModified// to determine the next wake time.// If minWhenHeap = 0, it means there are no timers in the heap.// minWhenHeap 是堆中最小的觸發時間(即 heap[0].when)// wakeTime 方法會結合 minWhenHeap 和 minWhenModified// 計算下一個喚醒時間// 若 minWhenHeap = 0,表示堆中無定時器minWhenHeap atomic.Int64// minWhenModified is a lower bound on the minimum// heap[i].when over timers with the timerModified bit set.// If minWhenModified = 0, it means there are no timerModified timers in the heap.// minWhenModified 是所有標記為 timerModified 的定時器中最小的觸發時間的下界// 若 minWhenModified = 0,表示堆中無 timerModified 的定時器// timerModified 標志用于表示定時器被修改過(例如重新設置觸發時間)minWhenModified atomic.Int64
}