本篇文章帶大家聊聊go語言中的限流漏桶和令牌桶庫,介紹令牌桶和漏桶的實現(xiàn)原理以及在實際項目中簡單應(yīng)用。
為什么需要限流中間件?
在大數(shù)據(jù)量高并發(fā)訪問時,經(jīng)常會出現(xiàn)服務(wù)或接口面對大量的請求而導(dǎo)致數(shù)據(jù)庫崩潰的情況,甚至引發(fā)連鎖反映導(dǎo)致整個系統(tǒng)崩潰。或者有人惡意攻擊網(wǎng)站,大量的無用請求出現(xiàn)會導(dǎo)致緩存穿透的情況出現(xiàn)。使用限流中間件可以在短時間內(nèi)對請求進行限制數(shù)量,起到降級的作用,從而保障了網(wǎng)站的安全性。
應(yīng)對大量并發(fā)請求的策略?
-
使用消息中間件進行統(tǒng)一限制(降速)
-
使用限流方案將多余請求返回(限流)
-
升級服務(wù)器
-
緩存(但仍然有緩存穿透等危險)
-
等等
可以看出在代碼已經(jīng)無法提升的情況下,只能去提升硬件水平?;蛘吒膭蛹軜?gòu)再加一層!也可以使用消息中間件統(tǒng)一處理。而結(jié)合看來,限流方案是一種既不需要大幅改動也不需要高額開銷的策略。
常見的限流方案
-
令牌桶算法
-
漏桶算法
-
滑動窗口算法
-
等等
漏桶
引入ratelimit庫
go get -u go.uber.org/ratelimit
庫函數(shù)源代碼
// New returns a Limiter that will limit to the given RPS. func New(rate int, opts ...Option) Limiter { return newAtomicBased(rate, opts...) } // newAtomicBased returns a new atomic based limiter. func newAtomicBased(rate int, opts ...Option) *atomicLimiter { // TODO consider moving config building to the implementation // independent code. config := buildConfig(opts) perRequest := config.per / time.Duration(rate) l := &atomicLimiter{ perRequest: perRequest, maxSlack: -1 * time.Duration(config.slack) * perRequest, clock: config.clock, } initialState := state{ last: time.Time{}, sleepFor: 0, } atomic.StorePointer(&l.state, unsafe.Pointer(&initialState)) return l }
該函數(shù)使用了函數(shù)選項模式對多個結(jié)構(gòu)體對象進行初始化
根據(jù)傳入的值來初始化一個桶結(jié)構(gòu)體 rate
為int
傳參 。
初始化過程中包括了
- 每一滴水需要的時間
perquest = config.per / time.Duration(rate)
maxSlack
寬松度(寬松度為負值)-1 * time.Duration(config.slack) * perRequest
松緊度是用來規(guī)范等待時間的
// Clock is the minimum necessary interface to instantiate a rate limiter with // a clock or mock clock, compatible with clocks created using // github.com/andres-erbsen/clock. type Clock interface { Now() time.Time Sleep(time.Duration) }
同時還需要結(jié)構(gòu)體Clock
來記錄當(dāng)前請求的時間now
和此刻的請求所需要花費等待的時間sleep
type state struct { last time.Time sleepFor time.Duration }
state
主要用來記錄上次執(zhí)行的時間以及當(dāng)前執(zhí)行請求需要花費等待的時間(作為中間狀態(tài)記錄)
最重要的Take邏輯
func (t *atomicLimiter) Take() time.Time { var ( newState state taken bool interval time.Duration ) for !taken { now := t.clock.Now() previousStatePointer := atomic.LoadPointer(&t.state) oldState := (*state)(previousStatePointer) newState = state{ last: now, sleepFor: oldState.sleepFor, } if oldState.last.IsZero() { taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) continue } // 計算是否需要進行等待取水操作 newState.sleepFor += t.perRequest(每兩滴水之間的間隔時間) - now.Sub(oldState.last)(當(dāng)前時間與上次取水時間的間隔) // 如果等待取水時間特別小,就需要松緊度進行維護 if newState.sleepFor < t.maxSlack { newState.sleepFor = t.maxSlack } // 如果等待時間大于0,就進行更新 if newState.sleepFor > 0 { newState.last = newState.last.Add(newState.sleepFor) interval, newState.sleepFor = newState.sleepFor, 0 } taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) } t.clock.Sleep(interval) // 最后返回需要等待的時間 return newState.last }
實現(xiàn)一個Take方法
-
該Take方法會進行原子性操作(可以理解為加鎖和解鎖),在大量并發(fā)請求下仍可以保證正常使用。
-
記錄下當(dāng)前的時間
now := t.clock.Now()
-
oldState.last.IsZero()
判斷是不是第一次取水,如果是就直接將state
結(jié)構(gòu)體中的值進行返回。而這個結(jié)構(gòu)體中初始化了上次執(zhí)行時間,如果是第一次取水就作為當(dāng)前時間直接傳參。 -
如果
newState.sleepFor
非常小,就會出現(xiàn)問題,因此需要借助寬松度,一旦這個最小值比寬松度小,就用寬松度對取水時間進行維護。 -
如果
newState.sleepFor > 0
就直接更新結(jié)構(gòu)體中上次執(zhí)行時間newState.last = newState.last.Add(newState.sleepFor)
并記錄需要等待的時間interval, newState.sleepFor = newState.sleepFor, 0
。 -
如果允許取水和等待操作,那就說明沒有發(fā)生并發(fā)競爭的情況,就模擬睡眠時間
t.clock.Sleep(interval)
。然后將取水的目標(biāo)時間進行返回,由服務(wù)端代碼來判斷是否打回響應(yīng)或者等待該時間后繼續(xù)響應(yīng)。
t.clock.Sleep(interval)
func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }
實際上在一個請求來的時候,限流器就會進行睡眠對應(yīng)的時間,并在睡眠后將最新取水時間返回。
實際應(yīng)用(使用Gin框架)
func ratelimit1() func(ctx *gin.Context) { r1 := rate1.New(100) return func(ctx *gin.Context) { now := time.Now() // Take 返回的是一個 time.Duration的時間 if r1.Take().Sub(now) > 0 { // 返回的時間比當(dāng)前的時間還大,說明需要進行等待 // 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行 // 如果不需要等待請求時間,就直接進行Abort 然后返回 response(ctx, http.StatusRequestTimeout, "rate1 limit...") fmt.Println("rate1 limit...") ctx.Abort() return } // 放行 ctx.Next() } }
這里你可以進行選擇是否返回。因為Take一定會執(zhí)行sleep函數(shù),所以當(dāng)執(zhí)行take結(jié)束后表示當(dāng)前請求已經(jīng)接到了水。當(dāng)前演示使用第一種情況。
-
如果你的業(yè)務(wù)要求響應(yīng)不允許進行等待。那么可以在該請求接完水之后然后,如上例。
-
如果你的業(yè)務(wù)允許響應(yīng)等待,那么該請求等待對應(yīng)的接水時間后進行下一步。具體代碼就是將
if
中的內(nèi)容直接忽略。(建議使用)
測試代碼
這里定義了一個響應(yīng)函數(shù)和一個handler
函數(shù)方便測試
func response(c *gin.Context, code int, info any) { c.JSON(code, info) } func pingHandler(c *gin.Context) { response(c, 200, "ping ok~") }
執(zhí)行go test -run=Run -v
先開啟一個web服務(wù)
func TestRun(t *testing.T) { r := gin.Default() r.GET("/ping1", ratelimit1(), pingHandler) r.GET("/ping2", ratelimit2(), helloHandler) _ = r.Run(":4399") }
使用接口壓力測試工具go-wrk
進行測試->tsliwowicz/go-wrk: go-wrk)
在golang引入install版本可以直接通過go install github.com/tsliwowicz/go-wrk@latest
下載
使用幫助
Usage: go-wrk <options> <url> Options: -H Header to add to each request (you can define multiple -H flags) (Default ) -M HTTP method (Default GET) -T Socket/request timeout in ms (Default 1000) -body request body string or @filename (Default ) -c Number of goroutines to use (concurrent connections) (Default 10) -ca CA file to verify peer against (SSL/TLS) (Default ) -cert CA certificate file to verify peer against (SSL/TLS) (Default ) -d Duration of test in seconds (Default 10) -f Playback file name (Default <empty>) -help Print help (Default false) -host Host Header (Default ) -http Use HTTP/2 (Default true) -key Private key file name (SSL/TLS (Default ) -no-c Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false) -no-ka Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false) -no-vr Skip verifying SSL certificate of the server (Default false) -redir Allow Redirects (Default false) -v Print version details (Default false)
-t 8個線程 -c 400個連接 -n 模擬100次請求 -d 替換-n 表示連接時間
輸入
go-wrk -t=8 -c=400 -n=100 http://127.0.0.1:4399/ping1
可以稍微等待一下水流積攢(壓測速度過快)。
可以看出,
89
個請求全部返回。也就是說在一段請求高峰期,不會有請求進行響應(yīng)。因此我認(rèn)為既然內(nèi)部已經(jīng)睡眠,那么就也就應(yīng)該對請求放行處理。
令牌桶
引入ratelimit
庫
go get -u github.com/juju/ratelimit
初始化
// NewBucket returns a new token bucket that fills at the // rate of one token every fillInterval, up to the given // maximum capacity. Both arguments must be // positive. The bucket is initially full. func NewBucket(fillInterval time.Duration, capacity int64) *Bucket { return NewBucketWithClock(fillInterval, capacity, nil) } // NewBucketWithClock is identical to NewBucket but injects a testable clock // interface. func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket { return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock) }
進行Bucket
桶的初始化。
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket { if clock == nil { clock = realClock{} } // 填充速率 if fillInterval <= 0 { panic("token bucket fill interval is not > 0") } // 最大令牌容量 if capacity <= 0 { panic("token bucket capacity is not > 0") } // 單次令牌生成量 if quantum <= 0 { panic("token bucket quantum is not > 0") } return &Bucket{ clock: clock, startTime: clock.Now(), latestTick: 0, fillInterval: fillInterval, capacity: capacity, quantum: quantum, availableTokens: capacity, } }
令牌桶初始化過程,初始化結(jié)構(gòu)體 fillInterval
(填充速率) cap
(最大令牌量) quannum
(每次令牌生成量)。
如果三個變量有一個小于或者等于0的話直接進行報錯返回。在最開始就將當(dāng)前令牌數(shù)初始化為最大容量。
調(diào)用
// TakeAvailable takes up to count immediately available tokens from the // bucket. It returns the number of tokens removed, or zero if there are // no available tokens. It does not block. func (tb *Bucket) TakeAvailable(count int64) int64 { tb.mu.Lock() defer tb.mu.Unlock() return tb.takeAvailable(tb.clock.Now(), count) }
調(diào)用TakeAvailable
函數(shù),傳入?yún)?shù)為需要取出的令牌數(shù)量,返回參數(shù)是實際能夠取出的令牌數(shù)量。
內(nèi)部實現(xiàn)
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 { // 如果需要取出的令牌數(shù)小于等于零,那么就返回0個令牌 if count <= 0 { return 0 } // 根據(jù)時間對當(dāng)前桶中令牌數(shù)進行計算 tb.adjustavailableTokens(tb.currentTick(now)) // 計算之后的令牌總數(shù)小于等于0,說明當(dāng)前令牌不足取出,那么就直接返回0個令牌 if tb.availableTokens <= 0 { return 0 } // 如果當(dāng)前存儲的令牌數(shù)量多于請求數(shù)量,那么就返回取出令牌數(shù) if count > tb.availableTokens { count = tb.availableTokens } // 調(diào)整令牌數(shù) tb.availableTokens -= count return count }
-
如果需要取出的令牌數(shù)小于等于零,那么就返回0個令牌
-
根據(jù)時間對當(dāng)前桶中令牌數(shù)進行計算
-
計算之后的令牌總數(shù)小于等于0,說明當(dāng)前令牌不足取出,那么就直接返回0個令牌
-
如果當(dāng)前存儲的令牌數(shù)量多于請求數(shù)量,那么就返回取出令牌數(shù)
-
調(diào)整令牌數(shù)
調(diào)整令牌
func (tb *Bucket) adjustavailableTokens(tick int64) { lastTick := tb.latestTick tb.latestTick = tick // 如果當(dāng)前令牌數(shù)大于最大等于容量,直接返回最大容量 if tb.availableTokens >= tb.capacity { return } // 當(dāng)前令牌數(shù) += (當(dāng)前時間 - 上次取出令牌數(shù)的時間) * quannum(每次生成令牌量) tb.availableTokens += (tick - lastTick) * tb.quantum // 如果當(dāng)前令牌數(shù)大于最大等于容量, 將當(dāng)前令牌數(shù) = 最大容量 然后返回 當(dāng)前令牌數(shù) if tb.availableTokens > tb.capacity { tb.availableTokens = tb.capacity } return }
-
如果當(dāng)前令牌數(shù)大于最大等于容量,直接返回最大容量
-
當(dāng)前令牌數(shù) += (當(dāng)前時間 – 上次取出令牌數(shù)的時間) * quannum(每次生成令牌量)
-
如果當(dāng)前令牌數(shù)大于最大等于容量, 將當(dāng)前令牌數(shù) = 最大容量 然后返回 當(dāng)前令牌數(shù)
實現(xiàn)原理
-
加鎖
defer
解鎖 -
判斷count(想要取出的令牌數(shù)) 是否小于等于 0,如果是直接返回 0
-
調(diào)用函數(shù)
adjustTokens
獲取可用的令牌數(shù)量 -
如果當(dāng)前可以取出的令牌數(shù)小于等于0 直接返回 0
-
如果當(dāng)前可以取出的令牌數(shù)小于當(dāng)前想要取出的令牌數(shù)(count) count = 當(dāng)前可以取出的令牌數(shù)
-
當(dāng)前的令牌數(shù) -= 取出的令牌數(shù) (count)
-
返回 count(可以取出的令牌數(shù))
額外介紹
take
函數(shù),能夠返回等待時間和布爾值,允許欠賬,沒有令牌也可以取出。
func (tb *Bucket) Take(count int64) time.Duration
takeMaxDuration
函數(shù),可以根據(jù)最大等待時間來進行判斷。
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
因為他們內(nèi)部的實現(xiàn)都基于令牌調(diào)整,我這里不做過多介紹,如果感興趣可以自行研究一下。
測試
func ratelimit2() func(ctx *gin.Context) { // 生成速率 最大容量 r2 := rate2.NewBucket(time.Second, 200) return func(ctx *gin.Context) { //r2.Take() // 允許欠賬,令牌不夠也可以接收請求 if r2.TakeAvailable(1) == 1 { // 如果想要取出1個令牌并且能夠取出,就放行 ctx.Next() return } response(ctx, http.StatusRequestTimeout, "rate2 limit...") ctx.Abort() return } }
壓測速度過于快速,在實際過程中可以根據(jù)調(diào)整令牌生成速率來進行具體限流!
小結(jié)
令牌桶可以允許自己判斷請求是否繼續(xù),內(nèi)部不會進行睡眠操作。而漏桶需要進行睡眠,并沒有提供方法讓程序員進行判斷是否放行。
【