本文目錄
- 一、限流
- 二、漏桶
- 三、令牌桶算法
- 四、Gin框架中實現令牌桶限流
一、限流
限流又稱為流量控制,也就是流控,通常是指限制到達系統的并發請求數。
限流雖然會影響部分用戶的使用體驗,但是能一定程度上保證系統的穩定性,不至于崩潰。
常見的各種廠商的公開API服務通常也會限制用戶的請求次數,比如百度地圖的API來限制請求數等。
二、漏桶
漏桶是一種比較常見的限流策略。
一句話來概括漏洞的核心就是:數據以任意速率進入漏桶,但是漏桶以固定速率(通常是預先設定的速率)將數據輸出。
有點像削峰填谷,但是它缺點也很明顯,就是不能很好處理有大量突發請求的場景。
import ("fmt""time""go.uber.org/ratelimit"
)func main() {rl := ratelimit.New(100) // per secondprev := time.Now()for i := 0; i < 10; i++ {now := rl.Take()fmt.Println(i, now.Sub(prev))prev = now}// Output:// 0 0// 1 10ms// 2 10ms// 3 10ms// 4 10ms// 5 10ms// 6 10ms// 7 10ms// 8 10ms// 9 10ms
}
上面你的代碼是uber
團隊開源的漏桶庫ratelimit
,這個庫比較簡單。
ratelimit.New(100)
創建了一個速率限制器(ratelimit.Limiter),限制頻率為每秒 100 次。這里的 100 表示每秒最多可以獲取 100 次令牌(tokens),即每秒最多可以執行 100 次操作。
prev
是一個 time.Time
類型的變量,用于記錄上一次操作的時間。初始值為當前時間。
rl.Take()
這是 ratelimit.Limiter
的一個方法,用于獲取一個令牌。如果當前時間距離上一次獲取令牌的時間小于 1/100 秒(因為每秒 100 次),Take() 方法會阻塞,直到下一個令牌可用。now 是獲取令牌時的時間點(time.Time 類型)。
now.Sub(prev)
計算當前時間點 now 和上一次時間點 prev 之間的時間差(返回 time.Duration
類型)。這個時間差表示兩次操作之間的時間間隔。
所以這段代碼的目的是通過 ratelimit.New(100) 創建一個每秒最多執行 100 次操作的速率限制器,并在循環中模擬 10 次操作。每次操作之間的時間間隔會被打印出來,用于觀察速率限制器是否按預期工作。
我們來看看這個限流器的實現源碼。
限制器是一個接口類型,其要求實現一個Take()方法:
type Limiter interface {// Take方法應該阻塞已確保滿足 RPSTake() time.Time
}
實現限制器接口的結構體定義如下,這里可以重點留意下maxSlack
字段,它在后面的Take()方法中的處理。
type limiter struct {sync.Mutex // 鎖last time.Time // 上一次的時刻sleepFor time.Duration // 需要等待的時間perRequest time.Duration // 每次的時間間隔maxSlack time.Duration // 最大的富余量clock Clock // 時鐘
}
limiter
結構體實現Limiter
接口的Take()
方法內容如下:
// Take 會阻塞確保兩次請求之間的時間走完
// Take 調用平均數為 time.Second/rate.
func (t *limiter) Take() time.Time {t.Lock()defer t.Unlock()now := t.clock.Now()// 如果是第一次請求就直接放行if t.last.IsZero() {t.last = nowreturn t.last}// sleepFor 根據 perRequest 和上一次請求的時刻計算應該sleep的時間// 由于每次請求間隔的時間可能會超過perRequest, 所以這個數字可能為負數,并在多個請求之間累加t.sleepFor += t.perRequest - now.Sub(t.last)// 我們不應該讓sleepFor負的太多,因為這意味著一個服務在短時間內慢了很多隨后會得到更高的RPS。if t.sleepFor < t.maxSlack {t.sleepFor = t.maxSlack}// 如果 sleepFor 是正值那么就 sleepif t.sleepFor > 0 {t.clock.Sleep(t.sleepFor)t.last = now.Add(t.sleepFor)t.sleepFor = 0} else {t.last = now}return t.last
}
上面的代碼根據記錄每次請求的間隔時間和上一次請求的時刻來計算當次請求需要阻塞的時間——sleepFor
,這里需要留意的是sleepFor
的值可能為負,在經過間隔時間長的兩次訪問之后會導致隨后大量的請求被放行,所以代碼中針對這個場景有專門的優化處理。創建限制器的New()
函數中會為maxSlack
設置初始值,也可以通過WithoutSlack
這個Option
取消這個默認值。
func New(rate int, opts ...Option) Limiter {l := &limiter{perRequest: time.Second / time.Duration(rate),maxSlack: -10 * time.Second / time.Duration(rate),}for _, opt := range opts {opt(l)}if l.clock == nil {l.clock = clock.New()}return l
}
三、令牌桶算法
令牌桶按固定的速率往桶里放入令牌,并且只要能從桶里取出令牌就能通過,令牌桶支持突發流量的快速處理。
如果放令牌的速率很快,那么令牌桶里邊有很多令牌了,這個時候當有大量的客戶端請求過來了,那么就可以直接取出令牌處理請求。
也就是令牌以固定的速率(例如每秒生成若干個令牌)被添加到桶中,桶有一個最大容量,當桶滿時,多余的令牌會被丟棄。
次發送數據(如網絡請求或數據包)時,需要從桶中取出一定數量的令牌。如果桶中有足夠的令牌,則允許發送數據,并從桶中扣除相應的令牌;如果令牌不足,則請求被拒絕或排隊等待。
由于桶中的令牌可以積累,因此在短時間內允許突發流量(bursty traffic),只要平均速率符合限制。
對于從桶里取不到令牌的場景,我們可以選擇等待也可以直接拒絕并返回。這個需要根據業務場景來處理,如果業務場景不同,那么相對應的處理也就需要不一樣。
可以參照github上的開源庫:github.com/juju/ratelimit庫。
創建令牌桶的方法如下:
// 創建指定填充速率和容量大小的令牌桶
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket// 創建指定填充速率、容量大小和每次填充的令牌數的令牌桶
func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket// 創建填充速度為指定速率和容量大小的令牌桶
// NewBucketWithRate(0.1, 200) 表示每秒填充20個令牌
func NewBucketWithRate(rate float64, capacity int64) *Bucket
取出令牌的方法如下:
// 取token(非阻塞)
func (tb *Bucket) Take(count int64) time.Duration
func (tb *Bucket) TakeAvailable(count int64) int64// 最多等maxWait時間取token
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)// 取token(阻塞)
func (tb *Bucket) Wait(count int64)
func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool
(注意這個令牌桶跟JWT算法中的Token并不是一個概念。)
雖說是令牌桶但沒有必要真的去生成令牌放到桶里,我們只需要每次來取令牌的時候計算一下,當前是否有足夠的令牌就可以了,具體的計算方式可以總結為下面的公式:
當前令牌數 = 上一次剩余的令牌數 + (本次取令牌的時刻-上一次取令牌的時刻)/放置令牌的時間間隔 * 每次放置的令牌數
四、Gin框架中實現令牌桶限流
這里我們可以將限流組件定義成中間件,因為中間件是在所有請求的必經之路上。按照需要限流的策略將中間件添加到不同的地方,如果要對全站進行限流就可以添加成全局中間件。如果某一組路由需要限流,就添加到對應的路由組就行。
func RateLimitMiddleware(fillInterval time.Duration, cap int64) func(c *gin.Context) {bucket := ratelimit.NewBucket(fillInterval, cap)return func(c *gin.Context) {// 如果取不到令牌就中斷本次請求返回 rate limit...if bucket.TakeAvailable(1) == 0 {c.String(http.StatusOK, "rate limit...")c.Abort()return}// 取到令牌就放行c.Next()}
}
cap
是填充速率,比如一秒鐘填一個等。
然后我們在路由中生成即可,這樣全網站就限速了。