大家好,我是Java烘焙师。上一篇文章介绍了限流方案的使用和选型,本文接着讲限流算法的原理。
常见的限流算法有:令牌桶、窗口计数、漏桶,前两种在实际使用中最常见,因此重点介绍。限流算法是通用的,既可以在单机上实现,也可以借助redis来实现分布式限流。
首先,定义一下限流算法需实现的基本功能:
- tryAquire尝试获取通行许可:立刻返回结果,true代表允许、false代表被限流
- aquire阻塞式获取通行许可:如果能获取到则返回,否则阻塞等待(指定超时时间,或一直等待)。当触发限流时借助
java.util.concurrent.locks.Condition的await方法来阻塞等待,当恢复时通过signalAll来唤醒阻塞的线程
令牌桶算法
概念
- 令牌:相当于通行许可,需要多少qps或资源消耗速率,就需要获取到多少令牌
- 桶:决定了令牌个数上限,即处理能力上限
- 令牌生成速率:每秒往桶里补充多少令牌
- 可用令牌大小:不超过桶大小
令牌桶算法,类似于生产者/消费者模式。桶相当于队列,令牌相当于任务,往桶里补充令牌相当于生产者,获取令牌相当于消费者。但不用真的生成线程、队列等实体,而是通过比较请求与可用令牌的大小、以及计算下一个令牌的发放时间,来实现的。
特点
- 场景通用,允许突发流量
- 理解起来不够直观,但代码实现相对简单
关键流程
不用定时补充令牌,而是类似于懒加载的方式,在调用时重新计算可用的令牌数。
- tryAquire方法:
- 先补充令牌:重新计算可用令牌数
- 比较可用令牌数、与请求许可数:如果前者大则扣减请求许可数、更新可用令牌数,并返回成功,否则返回失败
- aquire方法:
- 如果请求许可数 <= 可用令牌数:则扣减请求许可数、更新可用令牌数,并返回成功
- 否则,计算需要等待的时间:根据所需的令牌差额,乘以令牌的生成间隔时间,得到需等待的时间,即
(permits - tokens) * refillIntervalNanos。使用condition.awaitNanos方法,来实现阻塞等待。例如:桶容量300,每秒生成100个令牌,则令牌的生成间隔时间为 1秒 / 100 = 10毫秒,当前可用令牌为50,而请求许可为200,则需等待 (200 - 50) * 10毫秒 = 1500毫秒 - 重新补充令牌
- 上述步骤循环往复,直到获取到令牌
最关键的补充令牌逻辑:
- 计算需补充的令牌个数:根据自从上次补充以来的时间间隔,除以令牌的生成间隔时间,得到需补充的令牌个数,即
(now - lastRefillTime) / refillIntervalNanos - 更新可用令牌大小:不能超过桶容量大小,即在 capacity和tokens + tokensToAdd中取较小的那个
- 通知唤醒等待的线程:通过condition.signalAll()实现
以下是带注释的代码示例。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;/*** 令牌桶限流器 - 支持非阻塞和阻塞获取令牌*/
public class TokenBucketRateLimiter {// 桶容量大小private final long capacity;// 令牌生成速率(每秒生成的令牌数)private final double refillRate;// 每个令牌的生成间隔(纳秒),等于 1秒对应的纳秒 除以 令牌生成速率private final long refillIntervalNanos;// 可用令牌大小private double tokens;// 最后刷新时间(纳秒)private long lastRefillTime;// 锁和条件变量,用于阻塞等待private final ReentrantLock lock;private final Condition condition;/*** 构造方法*/public TokenBucketRateLimiter(long capacity, double refillRate) {if (capacity <= 0 || refillRate <= 0) {throw new IllegalArgumentException("容量和生成速率必须大于0");}this.capacity = capacity;this.refillRate = refillRate;this.refillIntervalNanos = (long) (1_000_000_000.0 / refillRate);this.tokens = capacity;this.lastRefillTime = System.nanoTime();this.lock = new ReentrantLock();this.condition = lock.newCondition();}/*** 非阻塞方式尝试获取一个令牌* @return 成功获取返回true,否则返回false*/public boolean tryAcquire() {return tryAcquire(1);}/*** 非阻塞方式尝试获取指定数量的令牌* @param permits 请求的令牌数量* @return 成功获取返回true,否则返回false*/public boolean tryAcquire(long permits) {if (permits <= 0) {throw new IllegalArgumentException("请求的令牌数必须大于0");}if (permits > capacity) {return false; // 请求超过桶容量,直接拒绝}lock.lock();try {// 重新补充令牌refillTokens();if (tokens >= permits) {tokens -= permits;return true;}return false;} finally {lock.unlock();}}/*** 阻塞方式获取一个令牌* @throws InterruptedException 如果等待过程中线程被中断*/public void acquire() throws InterruptedException {acquire(1);}/*** 阻塞方式获取指定数量的令牌(无限等待)* @param permits 请求的令牌数量* @throws InterruptedException 如果等待过程中线程被中断*/public void acquire(long permits) throws InterruptedException {if (permits <= 0) {throw new IllegalArgumentException("请求的令牌数必须大于0");}if (permits > capacity) {throw new IllegalArgumentException("请求的令牌数超过桶容量");}lock.lock();try {while (tokens < permits) {// 计算需要等待的时间double neededTokens = permits - tokens;long waitNanos = (long) (neededTokens * refillIntervalNanos);// 等待直到有足够令牌或超时condition.awaitNanos(waitNanos);// 重新补充令牌refillTokens();}tokens -= permits;} finally {lock.unlock();}}/*** 补充令牌(必须在锁内调用)*/private void refillTokens() {long now = System.nanoTime();// 计算需要补充的令牌个数double tokensToAdd = (now - lastRefillTime) / (double) refillIntervalNanos;if (tokensToAdd > 0) {// 可用令牌大小,最大不能超过桶容量大小tokens = Math.min(capacity, tokens + tokensToAdd);lastRefillTime = now;// 如果有等待的线程,通知它们if (tokens > 0) {condition.signalAll();}}}}
窗口计数算法
概念
- 时间窗口:一段时间范围,可以是秒,也可以是分钟、小时、天等。窗口限流值,就代表该时间窗口最大允许的请求数
- 格子:把时间窗口拆分到更细粒度,比如把1秒拆成10个100毫秒,每个格子有开始时间、结束时间
- 格子计数,就是格子对应时间范围的请求计数
- 窗口总计数,就是所有格子计数的总和
- 固定窗口计数:时间窗口固定,在临界情况下可能出现qps尖刺。例如:每秒限流100,在第一个1秒格子的第900毫秒请求许可80次(通过),在第二个1秒格子的第200毫秒请求许可70次(通过),但是整体看900毫秒到1200毫秒,这300毫秒内已经请求了150次,超过了限流100的阈值、存在qps尖刺
- 滑动窗口计数:滑动窗口有起始时间点、终止时间点,随着时间流逝一起动态往后推移,滑动窗口限流就是统计该时间范围内的请求许可是否超过阈值
一般用环形数组实现,数组的每个元素就是格子计数器,循环复用。
因为滑动窗口使用了更细粒度的格子,所以限流qps相对于固定窗口,会更平滑。
特点
- 追求严格的限流,不允许突发流量
- 理解起来容易,但是代码实现较复杂,需要维护更新格子计数
关键流程:
以滑动窗口计数为例,下面的关键流程是阅读代码后提炼出来的。
- tryAquire方法:
- 首先更新过期的格子:把过期格子的计数清零,同时更新格子的开始、结束时间
- 检查当前窗口总计数是否超过限制,是则返回false,代表被限流;否则更新当前格子的计数、以及窗口总计数,返回成功
- aquire方法:
- 首先更新过期的格子,然后检查当前窗口总计数是否超过限制,是则计算需要等待的时间,否则更新当前格子的计数、以及窗口总计数,返回成功
- 其中,计算需要等待的时间:
- 至少等待的时间:当前窗口中最早的格子多久后过期,则需要至少等待多久
- 至多等待的时间:如果窗口总计数减掉最早格子计数,仍然小于请求许可,说明即使最早格子过期,仍然会限流,则至多等待一个格子的时间,待下一轮循环再判断
- 使用condition.awaitNanos方法,来实现阻塞等待
- 上述步骤循环往复,直到满足条件
- 清理过期的格子时,如果发现窗口总计数小于限流值,则通知唤醒等待的线程,通过condition.signalAll()实现。
以下是带注释的代码示例。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Arrays;/*** 滑动窗口限流器 - 支持非阻塞和阻塞获取*/
public class SlidingWindowRateLimiter {// 时间窗口大小(毫秒)private final long windowSizeMs;// 窗口内允许的最大请求数private final int maxRequests;// 格子数量private final int slotCount;// 每个格子的时间长度(毫秒)private final long slotSizeMs;// 滑动窗口数组,每个元素代表一个时间格子的请求计数private final AtomicInteger[] slots;// 每个格子对应的开始时间(毫秒)private final long[] slotStartTimes;// 当前窗口的总请求数private final AtomicInteger totalRequests;// 锁和条件变量,用于阻塞等待private final ReentrantLock lock;private final Condition condition;/*** 构造函数* @param maxRequests 时间窗口内允许的最大请求数* @param windowSizeMs 时间窗口大小(毫秒)* @param slotCount 窗口分割的格子数量*/public SlidingWindowRateLimiter(int maxRequests, long windowSizeMs, int slotCount) {if (maxRequests <= 0 || windowSizeMs <= 0 || slotCount <= 0) {throw new IllegalArgumentException("参数必须大于0");}this.maxRequests = maxRequests;this.windowSizeMs = windowSizeMs;this.slotCount = slotCount;this.slotSizeMs = windowSizeMs / slotCount;// 初始化滑动窗口this.slots = new AtomicInteger[slotCount];this.slotStartTimes = new long[slotCount];this.totalRequests = new AtomicInteger(0);long currentTime = System.currentTimeMillis();for (int i = 0; i < slotCount; i++) {slots[i] = new AtomicInteger(0);slotStartTimes[i] = currentTime - (slotCount - i) * slotSizeMs;}this.lock = new ReentrantLock();this.condition = lock.newCondition();}/*** 非阻塞方式尝试获取一个请求许可* @return 成功获取返回true,否则返回false*/public boolean tryAcquire() {return tryAcquire(1);}/*** 非阻塞方式尝试获取指定数量的请求许可* @param permits 请求的许可数量* @return 成功获取返回true,否则返回false*/public boolean tryAcquire(int permits) {if (permits <= 0) {throw new IllegalArgumentException("请求的许可数必须大于0");}if (permits > maxRequests) {return false; // 请求超过窗口容量,直接拒绝}lock.lock();try {// 滑动窗口,更新过期的格子slideWindow();// 检查当前窗口是否超过限制if (totalRequests.get() + permits <= maxRequests) {// 获取当前时间对应的格子索引long currentTime = System.currentTimeMillis();int currentSlotIndex = getCurrentSlotIndex(currentTime);// 更新当前格子的计数和总计数slots[currentSlotIndex].addAndGet(permits);totalRequests.addAndGet(permits);return true;}return false;} finally {lock.unlock();}}/*** 阻塞方式获取一个请求许可(无限等待)* @throws InterruptedException 如果等待过程中线程被中断*/public void acquire() throws InterruptedException {acquire(1);}/*** 阻塞方式获取指定数量的请求许可(无限等待)* @param permits 请求的许可数量* @throws InterruptedException 如果等待过程中线程被中断*/public void acquire(int permits) throws InterruptedException {if (permits <= 0) {throw new IllegalArgumentException("请求的许可数必须大于0");}if (permits > maxRequests) {throw new IllegalArgumentException("请求的许可数超过窗口容量");}lock.lock();try {while (!tryAcquireInternal(permits)) {// 计算需要等待的时间(直到下一个格子过期)long waitTimeMs = calculateWaitTime(permits);if (waitTimeMs > 0) {condition.await(waitTimeMs, TimeUnit.MILLISECONDS);} else {condition.await();}}} finally {lock.unlock();}}/*** 内部尝试获取方法(必须在锁内调用)*/private boolean tryAcquireInternal(int permits) {slideWindow();if (totalRequests.get() + permits <= maxRequests) {long currentTime = System.currentTimeMillis();int currentSlotIndex = getCurrentSlotIndex(currentTime);slots[currentSlotIndex].addAndGet(permits);totalRequests.addAndGet(permits);return true;}return false;}/*** 滑动窗口,清理过期的格子*/private void slideWindow() {long currentTime = System.currentTimeMillis();long windowStartTime = currentTime - windowSizeMs;for (int i = 0; i < slotCount; i++) {// 如果格子开始时间在窗口开始时间之前,说明这个格子已过期if (slotStartTimes[i] < windowStartTime) {int expiredCount = slots[i].getAndSet(0);if (expiredCount > 0) {totalRequests.addAndGet(-expiredCount);// 更新格子开始时间为当前周期开始时间slotStartTimes[i] = currentTime - (slotCount - i - 1) * slotSizeMs;}}}// 如果有等待的线程,通知它们if (totalRequests.get() < maxRequests) {condition.signalAll();}}/*** 获取当前时间对应的格子索引*/private int getCurrentSlotIndex(long currentTime) {return (int) ((currentTime / slotSizeMs) % slotCount);}/*** 计算需要等待的时间(毫秒)*/private long calculateWaitTime(int permits) {long currentTime = System.currentTimeMillis();long windowStartTime = currentTime - windowSizeMs;// 找到最早的非空格子long earliestSlotTime = Long.MAX_VALUE;for (int i = 0; i < slotCount; i++) {if (slots[i].get() > 0 && slotStartTimes[i] < earliestSlotTime) {earliestSlotTime = slotStartTimes[i];}}if (earliestSlotTime == Long.MAX_VALUE) {return 0; // 没有找到非空格子,不需要等待}// 计算最早格子过期的时间long earliestExpireTime = earliestSlotTime + windowSizeMs;long waitTime = earliestExpireTime - currentTime;// 考虑请求数量,可能需要等待更多时间int availableSlots = maxRequests - totalRequests.get();if (availableSlots <= 0) {waitTime = Math.max(waitTime, slotSizeMs);}return Math.max(0, waitTime);}/*** 获取当前窗口内的总请求数* @return 当前请求数*/public int getCurrentRequests() {lock.lock();try {slideWindow();return totalRequests.get();} finally {lock.unlock();}}
}
分布式限流
上面提到的令牌桶、窗口计数限流算法,都是单机实现,还可以通过redis lua脚本实现分布式限流。lua脚本可保证原子性,即多条redis命令要么都执行成功、要么都不生效。
固定窗口计数(分布式限流实现)
以固定窗口限流为例,因为可设置窗口key的过期时间,所以不用专门写对格子计数清零的逻辑,简单很多。
关键流程
- 首先,通过get命令,查询窗口key是否存在
- 如果存在,则判断是否超过阈值,未超过则通过incr命令自增、返回成功,否则返回限流
- 如果不存在,则通过set ex命令,设置初始值1、以及过期时间ttl
固定窗口限流的lua脚本示例如下:
-- 固定窗口限流脚本
-- KEYS[1]: 限流key
-- ARGV[1]: 时间窗口大小(秒)
-- ARGV[2]: 最大请求次数local key = KEYS[1]
local window = tonumber(ARGV[1])
local maxCount = tonumber(ARGV[2])-- 获取当前计数
local current = redis.call('GET', key)
if current == false then-- key不存在,设置初始值并设置过期时间redis.call('SET', key, 1, 'EX', window)return 1
else-- key存在,检查是否超过限制local count = tonumber(current)if count < maxCount then-- 未超过限制,计数+1redis.call('INCR', key)return 1else-- 超过限制return 0end
end
总结
理解限流算法的原理,有助于做出合适的方案选型。
回顾上一篇文章介绍的限流方案:
- 令牌桶算法允许突发流量,实现较轻量,适用场景最通用:Guava RateLimiter、Sentinel、Redisson RateLimiter都采用令牌桶算法、或变种实现
- 窗口计数算法强调平滑限流,Sentinel也内置了滑动窗口计数算法
- 漏桶算法,只能按固定速率处理、不够灵活,只在特定严格平滑限流的场景下适用