常用限流算法
包括固定窗口、滑动窗口、令牌桶、漏桶
固定窗口:
将时间划分为固定长度的窗口(如 1 秒),窗口内维护请求计数,当请求数超过阈值时拒绝新请求。实现简单,但存在窗口临界值问题;就是大量请求聚集在第一个窗口的最后和第二个窗口的最前
优点: 实现简单,开销小。
缺点: 存在典型的“窗口边界问题”:若大量请求集中在窗口末尾 + 下一个窗口开始,会出现瞬时突刺(burst),导致实际请求量明显超过设定值。
滑动窗口:
将固定窗口拆分为多个小时间片,通过滑动窗口覆盖的时间片总和作为最大请求量,这样就解决了窗口临界峰值问题,但精度越高(时间片越多),计算和存储开销越大
优点: 精度高,限流更平滑。
缺点: 越高精度(时间片越多),存储和计算成本越高。
漏桶算法:
请求进来后直接进入漏桶排序,漏桶以固定的顺序处理请求,如果新增请求的速率大于漏桶漏出的速率,多余的请求会溢出
特征: 输出速率固定,可平滑流量,但不允许突发。
令牌桶算法:
以固定的速率产生令牌,直到桶内的令牌满了。新请求进来后取出一个令牌才会放行,如果没有取到令牌则进入一个队列等待,如果等待队列满了,新请求会被直接抛弃。
特征: 允许突发(桶内累积的令牌),更灵活。
基于Redis的滑动窗口限流实现
需求分析
- 滑动窗口算法的基本要点是回溯一段时间,得到这段时间内请求数量;
- 将不在窗口内的时间戳删除,以节省内存
比如:假设窗口大小为1分钟,限制10个请求,那么也就是回溯一分钟看看过去1分钟是不是已经有了10个请求;
思路:记录每次请求的时间戳,插入队列中,查询时判断队列中数据量即可
选择数据结构
什么样的结构方便回溯一段窗口,并计算数量呢?
- list
- zset
更加方便删除, 有个命令ZCount轻松获取范围内的值;ZRemRangeByScore删除范围外的值,因此大多数都会选择zset
定义限流器对象
type MiddlewareBuilder struct {client redis.CmdablekeyGenFunc func(ctx *gin.Context) string // 限流对象-基于字符串,可以是IP、UUID、ServerName等windowSize time.Duration // 窗口大小limit int64 // 限流阈值
}
这里封装为gin框架中间件的形式:
func NewMiddlewareBuilder(client redis.Cmdable, windowSize time.Duration, limit int64) *MiddlewareBuilder {builder := &MiddlewareBuilder{client: client,keyGenFunc: func(ctx *gin.Context) string {return ctx.ClientIP()},windowSize: windowSize,limit: limit,}return builder
}
V1版本
很简单,共有5个步骤:
- 统计最近窗口内的请求数
- 判断是否超限,超限直接返回
- 记录当前请求,插入zset
- 删掉没用的窗口外的数据,就是当前时刻减去窗口大小之前的数据
- 更新过期时间,因为限流往往是短时间集中的
func (b *MiddlewareBuilder) LimitMiddlewareV1() gin.HandlerFunc {return func(ctx *gin.Context) {now := time.Now().UnixNano()windowStart := now - b.windowSize.Nanoseconds()key := b.keyGenFunc(ctx)// 1. 统计窗口内的请求数(同时获取结果和错误)count, err := b.client.ZCount(ctx, key, fmt.Sprintf("%d", windowStart), fmt.Sprintf("%d", now)).Result()if err != nil {ctx.AbortWithStatus(500)return}// 2. 处理限流请求if count >= b.limit {ctx.AbortWithStatusJSON(429, gin.H{"error": "rate limit exceeded",})return}// 3. 记录当前请求_, err = b.client.ZAddNX(ctx, key, redis.Z{Score: float64(now),Member: fmt.Sprintf("%d", now),}).Result()if err != nil {ctx.AbortWithStatus(500)return}// 4. 过期请求清理 (清理窗口外的请求)_, err = b.client.ZRemRangeByScore(ctx, key, "0", fmt.Sprintf("%d", windowStart)).Result()if err != nil {ctx.AbortWithStatus(500)return}// 5. 设置过期时间b.client.Expire(ctx, key, b.windowSize * 3)ctx.Next()}
}
但是这样会有并发问题:
- 请求 A 调用 ZCOUNT 判断未超限
- 请求 B 也判断未超限
- 两个请求一起通过,导致实际请求数超过限制
解决方式:
考虑事务?
但是redis的事务机制并不是真正原子性,比如事务中某条命令执行错误,其他命令照样可以执行成功;另外不支持回滚操作
因此更好方式基于lua脚本实现,实现很简单,以字符串方式即可,当然如果命令较多,可以单独保存到一个文件中:
const luaScript = `
local key = KEYS[1]
local windowStart = tonumber(ARGV[1])
local now = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local random = ARGV[4] -- 新增随机数参数-- 生成唯一 member:毫秒时间 + 随机数
local member = now .. ":" .. random-- 统计窗口内的请求数
local count = redis.call('zcount', key, windowStart, now)
if count >= limit thenreturn 'false'
end-- 记录当前请求(member 唯一,不会覆盖)
redis.call('zadd', key, now, member)-- 清理窗口外的请求
redis.call('zremrangebyscore', key, '-inf', windowStart)-- 设置过期时间(窗口大小的 3 倍,单位秒)
redis.call('expire', key, math.ceil((now - windowStart) / 1000 * 3))return 'true'
`
然后执行方式是使用eval:
func (b *MiddlewareBuilder) LimitMiddlewareV2() gin.HandlerFunc {return func(ctx *gin.Context) {key := b.keyGenFunc(ctx)now := time.Now().UnixMilli()windowStart := now - b.windowSize.Milliseconds()random := rand.Intn(1000000)// 1. 执行 Lua 脚本allowed, err := b.client.Eval(ctx, luaScript, []string{key}, windowStart, now, b.limit, random).Bool()if err != nil {ctx.AbortWithStatus(500)fmt.Println(err)return}if !allowed {ctx.AbortWithStatusJSON(429, gin.H{"error": "rate limit exceeded",})return}}
}
运行& shell测试
func f6() {redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379",Password: "",DB: 0,})limiter := ratelimit.NewMiddlewareBuilder(redisClient, time.Second * 10, 5)r := gin.Default()r.GET("/test-v1", limiter.LimitMiddlewareV1(), func(ctx *gin.Context) {ctx.JSON(200, gin.H{"msg": "success-v1"})})r.GET("/test-v2", limiter.LimitMiddlewareV2(), func(ctx *gin.Context) {ctx.JSON(200, gin.H{"msg": "success-v2"})})r.Run(":8080")
}
测试
上述代码调用了限流中间件,并且窗口大小设置为10s,阈值设置为5,执行shell脚本测试V2:
for i in {1..20}; docurl -s -o /dev/null -w "%{http_code}\n" http://127.0.0.1:8080/test-v2 &
done
wait
结果:
(base) xing@xing-2 ttt % sh ./test.sh
200
200
200
200
429
200
429
429
429
429
429
429
429
429
429
429
429
429
429
429
只有5个请求得到200状态码,其余被限流返回了429。
修改shell脚本测试v1接口,结果如下:
(base) xing@xing-2 ttt % sh ./test.sh
(base) xing@xing-2 ttt % sh ./test.sh
429
429
429
429
429
429
429
429
429
429
200
200
200
200
200
200
200
200
200
200
每次200的请求不相同,但是大都超过5个,并发问题明显!