文章

限流算法

限流算法与Redis实现:固定窗口、滑动窗口、漏桶、令牌桶

限流算法

固定窗口

统计一个时间窗口的请求计数,若计数超过阈值则限流,下个时间窗口计数清零

  • 优点:实现简单,性能好
  • 缺点:无法应对临界问题(窗口切换时可能允许双倍流量通过)
  • 适用:限流精度要求不高的场景

基于String(key为固定窗口,val为窗口访问计数):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- KEYS[1]:限流key
local rateLimitKey = KEYS[1]

-- ARGV[1]:限流阈值
-- ARGV[2]:时间窗口长度
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])

-- 1. 固定窗口增加计数
local cnt = redis.call('incr', rateLimitKey)

-- 2. 设置过期以实现计数自动清零
if cnt == 1 then
  redis.call('expire', rateLimitKey, window)
end

-- 3. 若计数增加后未超过限流阈值,则允许流量通过
return cnt <= limit

滑动窗口

统计多个时间窗口的请求计数,若计数总和超过阈值则限流,根据时间不断移除旧时间窗口和添加新时间窗口

  • 优点:能够有效缓解固定窗口的“临界问题”,流量相对固定窗口平滑
  • 缺点:针对突发流量的处理能力较弱(突发异常流量会使使窗口计数迅速到达阈值,导致后续正常流量被拒绝)
  • 适用:限流精度要求相对高且不关心突发流量的场景

基于Sorted Set(member为请求ID,score为时间戳):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
-- KEYS[1]:限流key
local rateLimitKey = KEYS[1]

-- ARGV[1]:限流阈值
-- ARGV[2]:时间窗口长度
-- ARGV[3]:请求时间戳(即当前时间戳)
-- ARGV[4]:请求ID
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local reqId = tonumber(ARGV[4])

-- 1. 剔除过期计数
redis.call('zremrangebyscore', rateLimitKey, 0, now - window)

-- 2. 检查滑动窗口计数是否已经到达阈值, 若已到达阈值则拒绝流量
local count = redis.call('zcard', rateLimitKey) or 0
if count >= limit then
  return false
end

-- 3. 增加滑动窗口计数,并允许流量通过
redis.call('zadd', rateLimitKey, reqId, now)
return true

漏桶

请求时往桶里加入水滴(桶满时拒绝或阻塞),系统以恒定速率漏出水滴,需要等待到水滴漏出时才处理请求

  • 优点:以恒定速率处理请求,流量平滑
  • 缺点:无法应对突发流量,即使系统仍有余力,可能导致系统资源利用率低
  • 适用:需要严格限流的场景(如数据库访问限流)

基于Hash(维护漏桶状态):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
-- ================================ Redis Server ================================

-- KEYS[1]:限流key
local rateLimitKey = KEYS[1]

-- ARGV[1]:请求时间戳(即当前时间戳)
-- ARGV[2]:水位上限
-- ARGV[3]:漏水速率
local now = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local leakRate = tonumber(ARGV[3])

-- Hash字段1:waters,当前水位,初始为0,下限为0,上限为capacity
-- Hash字段2:lastTime,上次访问时间
local info = redis.call('hmget', rateLimitKey, 'waters', 'lastTime') or {}
local waters = tonumber(info[1]) or 0
local lastTime = tonumber(info[2]) or now

-- 1. 计算这段时间的漏水量
local prevWaters = math.max(0, waters - leakRate * (now - lastTime))

-- 2. 尝试加入水滴,以及更新waters和lastTime
local nextWaters = prevWaters + 1
if nextWaters > capacity then
  redis.call('hmset', rateLimitKey, 'waters', prevWaters, 'lastTime', now)
  -- 触发限流,返回-1
  return -1
else
  redis.call('hmset', rateLimitKey, 'waters', nextWaters, 'lastTime', now)
  -- 向Redis Client返回水滴漏出的等待时间(之前水量 / 漏水速率)
  return prevWaters / leakRate
end
1
2
3
4
5
6
7
8
9
10
11
12
// ================================ Redis Client ================================

// 向漏桶添加水滴
delay := AcquireLeakBucket(time.Now().Unix(), capacity, leakRate)
// 处理限流报错
if delay < 0 {
  return 429
}
// sleep等待水滴漏出,保证处理速率恒定
sleep(delay)
// ... do sth
return 200

令牌桶

系统以恒定速率填充令牌到桶里,请求时从桶里获取令牌(桶空时拒绝或阻塞),获取令牌成功才处理请求

  • 优点:与漏桶相较,流量同样平滑并且能够应对偶尔的突发流量
  • 缺点:无法应对持续的突发流量
  • 适用:系统平时稳定,偶尔需要应对突发流量

基于Hash(维护令牌桶状态):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- KEYS[1]:限流key
local rateLimitKey = KEYS[1]

-- ARGV[1]:请求时间戳(即当前时间戳)
-- ARGV[2]:令牌数上限
-- ARGV[3]:填充速率
local now = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local fillRate = tonumber(ARGV[3])

-- Hash字段1:tokens,当前令牌数,初始为capacity,下限为0,上限为capacity
-- Hash字段2:lastTime,上次访问时间
local info = redis.call('hmget', rateLimitKey, 'tokens', 'lastTime')
local tokens = tonumber(info[1]) or capacity
local lastTime = tonumber(info[2]) or now

-- 1. 计算这段时间的填充数
local prevTokens = min(capacity, tokens + fillRate * (now - lastTime))

-- 2. 尝试获取令牌,以及更新tokens和lastTime
local nextTokens = prevTokens - 1
if nextTokens < 0 then
  redis.call('hmset', rateLimitKey, 'tokens', prevTokens, 'lastTime', now)
  return false
else
  redis.call('hmset', rateLimitKey, 'tokens', nextTokens, 'lastTime', now)
  return true
end
本文由作者按照 CC BY 4.0 进行授权