在業務開發中,有時需要對某個操作在整個集群中限制并發度,例如限制大模型對話的并行數。基于redis zset實現計數鎖,做個筆記。
關鍵詞:并行流量控制、計數鎖
package redisutilimport ("context""fmt""math""time""github.com/go-redis/redis/v9"
)// AcquireZSetLock 借助redis zset數據結構實現分布式計數鎖。可用于計數任務運行數,防止超限。返回值:zset大小、釋放鎖的函數、錯誤信息
func AcquireZSetLock(ctx context.Context, c redis.Client, key string, element string, zsetMaxSize int,expiresIn time.Duration, syncWait time.Duration) (int, func() error, error) {ctx, cancel := context.WithTimeout(ctx, syncWait)defer cancel()for i := 0; ; i++ {select {case <-ctx.Done(): // 接到取消信號,按插入失敗處理return -1, func() error { return nil }, ctx.Err()default:}size, err := insertElementToZsetLock(ctx, c, key, element, zsetMaxSize, expiresIn)if err != nil {second := 0.4 + 0.6*math.Exp(-0.17*float64(i)) // f(i=0) = 1.0; f(i=10) = 0.5096,即第10次就會衰減到0.5096秒second = max(second, 0.5) // 最小間隔0.5秒,防止過于頻繁的請求time.Sleep(time.Duration(second*1000) * time.Millisecond)}releaseFunc := func() error {result, err := c.ZRem(context.Background(), key, element).Result()if err != nil {return fmt.Errorf("redis zrem error: %v. return=%d", err, result)}return nil}return size, releaseFunc, nil}
}// insertElementToZsetLock 插入元素到zset,并刪除已過期的元素
func insertElementToZsetLock(ctx context.Context, c redis.Client, key string, element string, zsetMaxSize int, expiresIn time.Duration) (int, error) {luaScript := `local zsetName = KEYS[1]local memberName = ARGV[1]local currentTime = tonumber(ARGV[2])local deadTime = tonumber(ARGV[3])local sizeLimit = tonumber(ARGV[4])-- 刪除已過期的元素redis.call("ZREMRANGEBYSCORE", zsetName, "-inf", currentTime)-- 獲取集合的大小local setSize = redis.call('ZCard', zsetName)-- 如果集合大小小于限制值,則添加元素,并返回集合大小if setSize < sizeLimit thenredis.call('ZAdd', zsetName, deadTime, memberName)local expireTime = deadTime - currentTimeif expireTime > 0 thenredis.call('EXPIRE', zsetName, expireTime)endreturn setSize+1endreturn -1`currentTime := time.Now().Unix()deadTime := time.Now().Add(expiresIn).Unix() // 過期時間 Unix秒ret, err := c.Do(ctx, "EVAL", luaScript, 1, key, element, currentTime, deadTime, zsetMaxSize).Result()if err != nil {return -1, err}if ret.(int64) < 0 {return zsetMaxSize, fmt.Errorf("zset size reach max size: %d", zsetMaxSize)}return int(ret.(int64)), nil
}
使用示例:
size, release, err := AcquireZSetLock(ctx, client, key, element, 10, 10*time.Second, 3*time.Second)
defer release()
if err != nil {fmt.Println(err)
}