以 go1.23.3 linux/amd64 為例。
定時器示例代碼:
package mainimport ("context""fmt""time"
)var ctx context.Contextfunc main() {timeout := 600 * time.Secondctx, _ = context.WithTimeout(context.Background(), timeout)deadline, _ := ctx.Deadline()fmt.Println("process start", time.Now().Format(time.DateTime))fmt.Println("ctx deadline", deadline.Format(time.DateTime))go func() {defer func() {fmt.Println("goroutine exit", time.Now().Format(time.DateTime))}()for {select {case <-ctx.Done():fmt.Println("ctx.Done", time.Now().Format(time.DateTime))returndefault:fmt.Println("do something start", time.Now().Format(time.DateTime))time.Sleep(60 * time.Second)fmt.Println("do something end ", time.Now().Format(time.DateTime))}}}()time.Sleep(timeout + 10*time.Second)fmt.Println("process exit", time.Now().Format(time.DateTime))
}
定時器創建流程:
定時器的類型為:runtime.timer,新建定時器會調用runtime.newTimer函數。
runtime.newTimer函數會調用func (t *timer) maybeAdd(),在此函數中將定時器放入ts堆中:
func (t *timer) maybeAdd() {// Note: Not holding any locks on entry to t.maybeAdd,// so the current g can be rescheduled to a different M and P// at any time, including between the ts := assignment and the// call to ts.lock. If a reschedule happened then, we would be// adding t to some other P's timers, perhaps even a P that the scheduler// has marked as idle with no timers, in which case the timer could// go unnoticed until long after t.when.// Calling acquirem instead of using getg().m makes sure that// we end up locking and inserting into the current P's timers.mp := acquirem()ts := &mp.p.ptr().timersts.lock()ts.cleanHead()t.lock()t.trace("maybeAdd")when := int64(0)wake := falseif t.needsAdd() {t.state |= timerHeapedwhen = t.whenwakeTime := ts.wakeTime()wake = wakeTime == 0 || when < wakeTimets.addHeap(t)}t.unlock()ts.unlock()releasem(mp)if wake {wakeNetPoller(when)}
}
入堆:
// addHeap adds t to the timers heap.
// The caller must hold ts.lock or the world must be stopped.
// The caller must also have checked that t belongs in the heap.
// Callers that are not sure can call t.maybeAdd instead,
// but note that maybeAdd has different locking requirements.
func (ts *timers) addHeap(t *timer) {assertWorldStoppedOrLockHeld(&ts.mu)// Timers rely on the network poller, so make sure the poller// has started.if netpollInited.Load() == 0 {netpollGenericInit()}if t.ts != nil {throw("ts set in timer")}t.ts = tsts.heap = append(ts.heap, timerWhen{t, t.when})ts.siftUp(len(ts.heap) - 1)if t == ts.heap[0].timer {ts.updateMinWhenHeap()}
}
timers堆為每P持有,保存P隊列中協程定義的定時器。
// A timers is a per-P set of timers.
type timers struct {// mu protects timers; timers are per-P, but the scheduler can// access the timers of another P, so we have to lock.mu mutex// heap is the set of timers, ordered by heap[i].when.// Must hold lock to access.heap []timerWhen// len is an atomic copy of len(heap).len atomic.Uint32// zombies is the number of timers in the heap// that are marked for removal.zombies atomic.Int32// raceCtx is the race context used while executing timer functions.raceCtx uintptr// minWhenHeap is the minimum heap[i].when value (= heap[0].when).// The wakeTime method uses minWhenHeap and minWhenModified// to determine the next wake time.// If minWhenHeap = 0, it means there are no timers in the heap.minWhenHeap atomic.Int64// minWhenModified is a lower bound on the minimum// heap[i].when over timers with the timerModified bit set.// If minWhenModified = 0, it means there are no timerModified timers in the heap.minWhenModified atomic.Int64
}type timerWhen struct {timer *timerwhen int64
}
創建定時器堆棧如圖:
定時器觸發流程:
timers堆的定時器通過func (ts *timers) run(now int64) int64出堆并運行。
而檢查是否有定時器到期是通過函數func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool)中的func (ts *timers) wakeTime() int64進行的。
check函數和wakeTime函數的調度時機在runtime/proc.go文件中多處存在,如runtime.findRunnable()、runtime.stealWork(now int64)、runtime.schedule()等。
這種依賴協程調度、系統調用等觸發的定時器檢查,延遲時間最多可達到func sysmon()協程的間隔時間10ms。
觸發定時器堆棧如圖:
另外在新建定時器時,也會檢查timers堆頂部的定時器剩余時間,如果已經到期也會立刻通過runtime.wakeNetPoller(when int64)觸發runtime.netpoll(delay int64)返回,檢查是否存在可處理的event,然后進行timers堆的定時器check。
定時器精度小結:
golang內置的Timer定時器維護在用戶態,比較輕量,依賴協程調度、系統調用、event等來觸發時間到期檢查,延遲在10ms以內,精度不高。
定時器的觀測:
修改源碼創建多個ctx定時器:
package mainimport ("context""fmt""time"
)var ctx context.Contextfunc main() {timeout := 300 * time.Secondctx, _ = context.WithTimeout(context.Background(), timeout)ctx, _ = context.WithTimeout(ctx, 180*time.Second)deadline, _ := ctx.Deadline()fmt.Println("process start", time.Now().Format(time.DateTime))fmt.Println("ctx deadline", deadline.Format(time.DateTime))go func() {defer func() {fmt.Println("goroutine exit", time.Now().Format(time.DateTime))}()for {select {case <-ctx.Done():fmt.Println("ctx.Done", time.Now().Format(time.DateTime))returndefault:fmt.Println("do something start", time.Now().Format(time.DateTime))time.Sleep(5 * time.Second)fmt.Println("do something end ", time.Now().Format(time.DateTime))}}}()time.Sleep(timeout + 10*time.Second)fmt.Println("process exit", time.Now().Format(time.DateTime))
}
dlv調試:
1、查看當前的定時器數量:
p runtime.allp[1].timers.heap
2、查看每個定時器的超時時間:
p (runtime.allp[1].timers.heap[0].when - time.startNano)/int64(time.Second)
p (runtime.allp[1].timers.heap[0].when - time.startNano)/int64(time.Second)
p (runtime.allp[1].timers.heap[0].when - time.startNano)/int64(time.Second)
3、調用其中一個定時器的回調函數:
call runtime.allp[1].timers.heap[0].timer.arg.(func())()
4、查看控制臺輸出:
共有3個定時器,分別是ctx的2個和主協程的time.Sleep,其中timers堆頂是180s的定時器。
在手工調用timers堆頂定時器的回調函數后,提前收到ctx.Done通知,程序提前退出。
如圖:
cancelCtx的父子關系:
繼續上面的例子:
1、查看ctx兩個定時器的回調函數是否一致:
p runtime.allp[1].timers.heap[0].timer.arg
p runtime.allp[1].timers.heap[1].timer.arg
2、查看父子cancelCtx變量內容:
#子ctx
p ctx
#父ctx
p ctx.cancelCtx.Context
3、觀測結果說明:
父子cancelCtx回調函數內部引用的外部變量context.timerCtx并不相同。
父子cancelCtx之間是嵌套關系,子嵌套(繼承)父。
最終使用的ctx為子ctx,子ctx的任一層父ctx的超時都會導致子ctx退出。
如圖:
父子cancelCtx的嵌套關系通過函數func (c *cancelCtx) propagateCancel(parent Context, child canceler)完成:
// propagateCancel arranges for child to be canceled when parent is.
// It sets the parent context of cancelCtx.
func (c *cancelCtx) propagateCancel(parent Context, child canceler) {c.Context = parentdone := parent.Done()if done == nil {return // parent is never canceled}select {case <-done:// parent is already canceledchild.cancel(false, parent.Err(), Cause(parent))returndefault:}if p, ok := parentCancelCtx(parent); ok {// parent is a *cancelCtx, or derives from one.p.mu.Lock()if p.err != nil {// parent has already been canceledchild.cancel(false, p.err, p.cause)} else {if p.children == nil {p.children = make(map[canceler]struct{})}p.children[child] = struct{}{}}p.mu.Unlock()return}if a, ok := parent.(afterFuncer); ok {// parent implements an AfterFunc method.c.mu.Lock()stop := a.AfterFunc(func() {child.cancel(false, parent.Err(), Cause(parent))})c.Context = stopCtx{Context: parent,stop: stop,}c.mu.Unlock()return}goroutines.Add(1)go func() {select {case <-parent.Done():child.cancel(false, parent.Err(), Cause(parent))case <-child.Done():}}()
}
--end--