文章目錄
- 案例分析
- gorm源碼解讀
- gin context 生命周期
- context什么時候cancel的
- 什么時候context會被動cancel掉呢?
- 野生協程如何處理
案例分析
報錯信息
{"L":"ERROR","T":"2024-12-17T11:11:33.005+0800","file":"*/log.go:61","message":"sql_trace","__type":"sql","trace_id":"6ab69b5d333de40c8327d8572336fa2c","error":"context canceled; invalid connection","elapsed":"2.292ms","rows":0,"sql":"UPDATE `logs` SET `response_time`=1734405092,`status`='success' WHERE id = 226081"
}
案發代碼:
func Sync(c *gin.Context) {var params services.Params// 參數綁定c.ShouldBindBodyWith(¶ms, binding.JSON)// 參數效驗// 記錄日志...// 開協程 更新日志go func() {defer helpers.Recovery(c)models.Log{Ctx: c.Request.Context()}.UpdateLog(logId, res)}()c.JSON(200, response.Success(nil))return
}func UpdateLog(id uint, r *services.ResJson) bool {exec := models.DefaultDB().WithContext(s.Ctx).Where("id = ?", id).Model(&Log{}).Updates(map[string]interface{}{"status": StatusSuccess,"response_time": time.Now().Unix(),})return exec.RowsAffected > 0
}
在更新數據庫時,開了一個協程去更新
gorm源碼解讀
gorm Find、Update方法會觸發GORM內部的處理器鏈,其中包括構建SQL語句、準備參數等。
最終,會調用到processor.Execute(db *DB)方法,這個方法會遍歷并執行一系列注冊的回調函數。
gorm.io/gorm@v1.25.11/finisher_api.go
// Update updates column with value using callbacks. Reference: https://gorm.io/docs/update.html#Update-Changed-Fields
func (db *DB) Update(column string, value interface{}) (tx *DB) {tx = db.getInstance()tx.Statement.Dest = map[string]interface{}{column: value}return tx.callbacks.Update().Execute(tx)
}// gorm.io/gorm@v1.25.11/callbacks.gofunc (p *processor) Execute(db *DB) *DB {...for _, f := range p.fns {f(db)}
}
// 注冊回調函數
gorm@v1.25.11/callbacks/callbacks.go
func RegisterDefaultCallbacks(db *gorm.DB, config *Config) {enableTransaction := func(db *gorm.DB) bool {return !db.SkipDefaultTransaction}if len(config.CreateClauses) == 0 {config.CreateClauses = createClauses}if len(config.QueryClauses) == 0 {config.QueryClauses = queryClauses}if len(config.DeleteClauses) == 0 {config.DeleteClauses = deleteClauses}if len(config.UpdateClauses) == 0 {config.UpdateClauses = updateClauses}createCallback := db.Callback().Create()createCallback.Match(enableTransaction).Register("gorm:begin_transaction", BeginTransaction)createCallback.Register("gorm:before_create", BeforeCreate)createCallback.Register("gorm:save_before_associations", SaveBeforeAssociations(true))createCallback.Register("gorm:create", Create(config))createCallback.Register("gorm:save_after_associations", SaveAfterAssociations(true))createCallback.Register("gorm:after_create", AfterCreate)createCallback.Match(enableTransaction).Register("gorm:commit_or_rollback_transaction", CommitOrRollbackTransaction)createCallback.Clauses = config.CreateClausesqueryCallback := db.Callback().Query()queryCallback.Register("gorm:query", Query)queryCallback.Register("gorm:preload", Preload)queryCallback.Register("gorm:after_query", AfterQuery)queryCallback.Clauses = config.QueryClausesdeleteCallback := db.Callback().Delete()deleteCallback.Match(enableTransaction).Register("gorm:begin_transaction", BeginTransaction)deleteCallback.Register("gorm:before_delete", BeforeDelete)deleteCallback.Register("gorm:delete_before_associations", DeleteBeforeAssociations)deleteCallback.Register("gorm:delete", Delete(config))deleteCallback.Register("gorm:after_delete", AfterDelete)deleteCallback.Match(enableTransaction).Register("gorm:commit_or_rollback_transaction", CommitOrRollbackTransaction)deleteCallback.Clauses = config.DeleteClausesupdateCallback := db.Callback().Update()updateCallback.Match(enableTransaction).Register("gorm:begin_transaction", BeginTransaction)updateCallback.Register("gorm:setup_reflect_value", SetupUpdateReflectValue)updateCallback.Register("gorm:before_update", BeforeUpdate)updateCallback.Register("gorm:save_before_associations", SaveBeforeAssociations(false))updateCallback.Register("gorm:update", Update(config))updateCallback.Register("gorm:save_after_associations", SaveAfterAssociations(false))updateCallback.Register("gorm:after_update", AfterUpdate)....
}
gorm.io/gorm@v1.25.11/callbacks/update.go
// Update update hook
func Update(config *Config) func(db *gorm.DB) {supportReturning := utils.Contains(config.UpdateClauses, "RETURNING")return func(db *gorm.DB) {if db.Error != nil {return}if db.Statement.Schema != nil {for _, c := range db.Statement.Schema.UpdateClauses {db.Statement.AddClause(c)}}if db.Statement.SQL.Len() == 0 {db.Statement.SQL.Grow(180)db.Statement.AddClauseIfNotExists(clause.Update{})if _, ok := db.Statement.Clauses["SET"]; !ok {if set := ConvertToAssignments(db.Statement); len(set) != 0 {defer delete(db.Statement.Clauses, "SET")db.Statement.AddClause(set)} else {return}}db.Statement.Build(db.Statement.BuildClauses...)}checkMissingWhereConditions(db)if !db.DryRun && db.Error == nil {if ok, mode := hasReturning(db, supportReturning); ok {// Update函數最終會調用到底層數據庫驅動的QueryContext方法,這個方法接受一個context.Context對象作為參數。if rows, err := db.Statement.ConnPool.QueryContext(db.Statement.Context, db.Statement.SQL.String(), db.Statement.Vars...); db.AddError(err) == nil {dest := db.Statement.Destdb.Statement.Dest = db.Statement.ReflectValue.Addr().Interface()gorm.Scan(rows, db, mode)db.Statement.Dest = destdb.AddError(rows.Close())}} else {result, err := db.Statement.ConnPool.ExecContext(db.Statement.Context, db.Statement.SQL.String(), db.Statement.Vars...)if db.AddError(err) == nil {db.RowsAffected, _ = result.RowsAffected()}}}}
}
調用數據庫驅動:
Update函數最終會調用到底層數據庫驅動的QueryContext方法,這個方法接受一個context.Context對象作為參數。
go1.22.3/src/database/sql/sql.go:1727
// QueryContext executes a query that returns rows, typically a SELECT.
// The args are for any placeholder parameters in the query.
func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {var rows *Rowsvar err errorerr = db.retry(func(strategy connReuseStrategy) error {rows, err = db.query(ctx, query, args, strategy)return err})return rows, err
}
底層數據庫連接:
QueryContext方法會進一步調用query方法,這個方法會處理數據庫連接的重試邏輯。
在query方法中,會調用conn方法來獲取一個數據庫連接,并在這個連接上執行查詢。
conn方法會處理context的取消和超時信號,如果context被取消或超時,它會中斷數據庫連接操作并返回錯誤。
go1.22.3/src/database/sql/sql.go:1748
func (db *DB) query(ctx context.Context, query string, args []any, strategy connReuseStrategy) (*Rows, error) {dc, err := db.conn(ctx, strategy)if err != nil {return nil, err}return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
}// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {db.mu.Lock()if db.closed {db.mu.Unlock()return nil, errDBClosed}// Check if the context is expired.select {default:case <-ctx.Done():db.mu.Unlock()return nil, ctx.Err()}
那為什么會出現context canceled?
gin context 生命周期
大多數情況下,context一直能持續到請求結束
當請求發生錯誤的時候,context會立刻被cancel掉
context什么時候cancel的
server端接受新請求時會起一個協程go c.serve(connCtx)
func (srv *Server) Serve(l net.Listener) error {// ...for {rw, err := l.Accept()connCtx := ctx// ...go c.serve(connCtx)}
}
協程里面for循環從鏈接中讀取請求,重點是這里每次讀取到請求的時候都會啟動后臺協程(w.conn.r.startBackgroundRead())繼續從鏈接中讀取。
// Serve a new connection.
func (c *conn) serve(ctx context.Context) {// ...// HTTP/1.x from here on.ctx, cancelCtx := context.WithCancel(ctx)c.cancelCtx = cancelCtxdefer cancelCtx()// ...for {// 從鏈接中讀取請求w, err := c.readRequest(ctx)if c.r.remain != c.server.initialReadLimitSize() {// If we read any bytes off the wire, we're active.c.setState(c.rwc, StateActive, runHooks)}// ....// 啟動協程后臺讀取鏈接if requestBodyRemains(req.Body) {registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)} else {w.conn.r.startBackgroundRead()}// ...// 這里轉到gin里面的serverHttp方法serverHandler{c.server}.ServeHTTP(w, w.req)// 請求結束之后cancel掉contextw.cancelCtx()// ...}
}
gin中執行ServeHttp方法
// ServeHTTP conforms to the http.Handler interface.
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {// ...// 執行我們寫的handle方法engine.handleHTTPRequest(c)// ...
}
正常請求結束之后gin框架會主動cancel掉context, ctx會清空,回收到ctx pool中。
// github.com/gin-gonic/gin@v1.7.7/gin.go// ServeHTTP conforms to the http.Handler interface.
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {c := engine.pool.Get().(*Context)c.writermem.reset(w)c.Request = reqc.reset()engine.handleHTTPRequest(c)engine.pool.Put(c)
}// github.com/gin-gonic/gin@v1.7.7/context.go
func (c *Context) reset() {c.Writer = &c.writermemc.Params = c.Params[0:0]c.handlers = nilc.index = -1c.fullPath = ""c.Keys = nilc.Errors = c.Errors[0:0]c.Accepted = nilc.queryCache = nilc.formCache = nil*c.params = (*c.params)[:0]*c.skippedNodes = (*c.skippedNodes)[:0]
}
什么時候context會被動cancel掉呢?
秘密就在w.conn.r.startBackgroundRead()
這個后臺讀取的協程里了。
func (cr *connReader) startBackgroundRead() {// ...go cr.backgroundRead()
}func (cr *connReader) backgroundRead() {n, err := cr.conn.rwc.Read(cr.byteBuf[:])// ...if ne, ok := err.(net.Error); ok && cr.aborted && ne.Timeout() {// Ignore this error. It's the expected error from// another goroutine calling abortPendingRead.} else if err != nil {cr.handleReadError(err)}// ...
}func (cr *connReader) handleReadError(_ error) {// 這里cancel了contextcr.conn.cancelCtx()cr.closeNotify()
}
startBackgroundRead
-> backgroundRead
-> handleReadError
。在handleReadError函數里面會把context cancel掉。
當服務端在處理業務的同時,后臺有個協程監控鏈接的狀態,如果鏈接有問題就會把context cancel掉。(cancel的目的就是快速失敗——業務不用處理了,就算服務端返回結果了,客戶端也不處理了)
野生協程如何處理
- http請求如有野生協程,不能使用request context(因為response之后context就會被cancel掉了),應當使用獨立的context(比如
context.Background()
) - 禁用野生協程,控制協程生命周期