开源限流组件分析(三):golang-time/rate

文章目录

  • 本系列
  • 前言
  • 提供获取令牌的API
  • 数据结构
  • 基础方法
    • tokensFromDuration
    • durationFromTokens
    • advance
  • 获取令牌方法
    • reverseN
    • 其他系列API
  • 令人费解的CancelAt
    • 是bug吗
  • 取消后无法唤醒其他请求

本系列

  • 开源限流组件分析(一):juju/ratelimit
  • 开源限流组件分析(二):uber-go/ratelimit
  • 开源限流组件分析(三):golang-time/rate (本文)

前言

根据开源限流组件分析(一):juju/ratelimit的分析,令牌桶限流的实现没必要真的准备一个桶,定时往里塞令牌,然后每次获取令牌时从桶中弹出,但这样做对内存不友好,需要开辟桶最大容量大小的空间

最佳做法是利用 token 数可以和时间跨度相互转化的原理,只用维护一个桶中当前令牌数的变量,每次消费前才根据时间差计算并更新 Token 数目

本文将分析go官方库提供的限流工具的使用及实现细节,也是令牌桶算法。并对下面这个问题给出合理的解释:在CancelAt方法中,为啥归还令牌时不是归还所有令牌,而是要扣减一个值

本文基于源码:https://github.com/golang/time.git,版本:v0.7.0

提供获取令牌的API

  • Wait/WaitN:当没有足够的Token时,将在内部阻塞等待直到Token足够,或超时取消

    • 工程中推荐使用该方法,参数中有ctx,和ctx配合能优雅实现超时控制
  • Allow/AllowN:当前时刻没有足够的Token时,返回false。否则获取到令牌,返回成功

  • Reverse/ReverseN:当没有足够的Token时,返回 Reservation 对象,里面包含是否获取成功,以及要等多久才能放行请求

    • 可控性最好,可以选择等待到规定的时间然后放心请求,也可以调Reservation.Cancel() 取消等待,归还令牌

数据结构

type Limit float64type Limiter struct {mu sync.Mutex// 每秒产生多少个tokenlimit Limit// 桶大小burst int// 桶中剩余的token数,可以为负数,表示被某些请求预定了一些未来的令牌tokens float64// 最近一次推进令牌桶的时间last time.Time// 令牌可以满足最近一次请求的时间lastEvent time.Time
}

特别说明这两个字段:

  • last:表示最近一次推进令牌桶的时间,也就是每次调获取令牌api的时间,因此每次获取令牌都会推进令牌桶的时间

    • 这样每次获取令牌时,都会看从last到now这段时间之间产生了多少新的令牌
  • lastEvent:表示令牌桶中最近一次请求能放行的时间。如果当前时间调获取令牌api时就能满足,就是当前时间,否则是未来的某个时间

    • 只用于CancelAt计算归还多少个令牌,可以到后面再理解

基础方法

tokensFromDuration

计算在时长d内,可以生成多少个令牌

也就是 速度 * 时间 = 总量

func (limit Limit) tokensFromDuration(d time.Duration) float64 {if limit <= 0 {return 0}return d.Seconds() * float64(limit)
}

durationFromTokens

计算生成tokens个令牌,需要多少时间

也就是 总数 / 速度 = 时间

func (limit Limit) durationFromTokens(tokens float64) time.Duration {if limit <= 0 {return InfDuration}seconds := tokens / float64(limit)return time.Duration(float64(time.Second) * seconds)
}

advance

计算如果时间推进到t后,桶中可用的令牌个数

注意:该方法只计算,不会更新桶中的令牌数

func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {last := lim.lastif t.Before(last) {last = t}// 过去了多长时间elapsed := t.Sub(last)// 计算过去这段时间,一共产生多少tokendelta := lim.limit.tokensFromDuration(elapsed)tokens := lim.tokens + delta// tokens不能超过最大容量if burst := float64(lim.burst); tokens > burst {tokens = burst}return t, tokens
}

获取令牌方法

reverseN

Wait系列,Allow系列,Reserve系列的方法,底层都调了reverseN,其流程如下:

  1. 计算如果时间推进到now,桶中可用的 token 数

  2. 计算本次消费后,桶还能剩能下多少token

  3. 如果 token < 0, 说明目前的token不够,需要等待一段时间

  4. 只有同时满足下面两个条件时,才获取令牌成功

    1. n <= lim.burst:申请的token数量没有超过了桶的容量大小
    2. waitDuration <= maxFutureReserve: 需要等待的时间 <=用户期望的时间

注意如果桶中令牌数不够,需要等待直到桶中令牌产生为止

此时需要将桶中令牌数置为负值,表示有请求欠了账,占了位

如果接下来有别的请求来获取令牌,会在之前欠账的基础上继续欠账,让令牌数的负值更大,需要等待更长的时间

通过时间流逝来还这些债,这一点和另一个令牌桶开源限流组件分析(一):juju/ratelimit很类似

// 当前时t时刻,想获取n个令牌,最多等maxFutureReserve时间
func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {lim.mu.Lock()defer lim.mu.Unlock()// 可以忽略这个分支,因为用了限流器就肯定要有速率限制if lim.limit == Inf {return Reservation{ok:        true,lim:       lim,tokens:    n,timeToAct: t,}}// 如果时间推进到now,可用的 token 数t, tokens := lim.advance(t)// 本次消费后,桶还能剩能下多少tokentokens -= float64(n)// 如果 token < 0, 说明目前的token不够,需要等待一段时间var waitDuration time.Durationif tokens < 0 {waitDuration = lim.limit.durationFromTokens(-tokens)}// n <= lim.burst:申请的 token 没有超过了桶的大小// waitDuration <= maxFutureReserve: 需要等待的时间 <=用户期望的时间ok := n <= lim.burst && waitDuration <= maxFutureReserver := Reservation{ok:    ok,lim:   lim,limit: lim.limit,}// 能获取到令牌if ok {// 本次获取了多少tokenr.tokens = n// 表示需要等待到这个时刻才能获得期望数量的token(当然 waitDuration 有可能为 0,就是立即满足,timeToAct就是now)r.timeToAct = t.Add(waitDuration)// 更新推进令牌桶的时间lim.last = t// 就扣减令牌,更新桶中token的数量lim.tokens = tokens// 桶中可以满足最近一次请求的时间lim.lastEvent = r.timeToAct}return r
}

其他系列API

allowN:调reserveN时传maxFutureReserve=0,表示只有不等待就能获得时,才获取令牌

func (lim *Limiter) AllowN(t time.Time, n int) bool {return lim.reserveN(t, n, 0).ok
}

ReserveN:直接调reserveN

func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation {r := lim.reserveN(t, n, InfDuration)return &r
}

WaitN:根据ctx计算要获取令牌最多等待多久,并以这个时间最为最大等待时间调reverseN。然后在内部sleep

  1. 先检查ctx是否已取消,如果已取消直接返回

  2. 根据ctx还剩多久过期,来决定在reverseN中最多等待多久

    1. 换句话说,如果判断直到ctx过期都无法获取令牌,就不等了
  3. 获取到Reservation后,看 1)根据Reservation计算还有多久能获取令牌的时间delay,和2) ctx被取消,这两件事谁先发生:

    1. 如果delay先过完:获取令牌成功,可以执行请求
    2. 如果ctx先被取消:本次获取令牌失败,需要尽可能归还桶中的令牌

正常来说,只要不是外部主动取消,都是delay先过完

func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {// 其实就是:time.NewTimer(d)newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {timer := time.NewTimer(d)return timer.C, timer.Stop, func() {}}return lim.wait(ctx, n, time.Now(), newTimer)
}func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {lim.mu.Lock()burst := lim.burstlimit := lim.limitlim.mu.Unlock()// 不能超过桶容量if n > burst && limit != Inf {return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)}// 先检查下ctx是否已取消select {case <-ctx.Done():return ctx.Err()default:}// 获取令牌的最大等待时间根据ctx计算waitLimit := InfDurationif deadline, ok := ctx.Deadline(); ok {waitLimit = deadline.Sub(t)}r := lim.reserveN(t, n, waitLimit)if !r.ok {return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)}// 该Reservation还需要等多久才能获得令牌delay := r.DelayFrom(t)if delay == 0 {return nil}ch, stop, _ := newTimer(delay)defer stop()select {case <-ch:// ctx超时或被取消之前,就获得令牌了return nilcase <-ctx.Done():// 在获得令牌之前,ctx就被取消了,需要归还令牌到桶中r.Cancel()return ctx.Err()}
}

令人费解的CancelAt

最后看看CancelAt:取消本次持有,尽可能归还持有的令牌

为啥需要归还令牌?因此本次实际上没有放行请求,而是被cancel掉了,可以把令牌放回桶中,供接下来的请求使用

// 参数t一般是当前时间
func (r *Reservation) CancelAt(t time.Time) {if !r.ok {return}r.lim.mu.Lock()defer r.lim.mu.Unlock()/**1.如果产生令牌的速率无限大,那没必要归还令牌2.自己没有拿到token,也不用归还3.预期获得时间比当前时间早,这里认为不用归还*/if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(t) {return}// 如果只有有更新的请求请求了令牌,要归还的令牌数减去// r.lim.lastEvent.Sub(r.timeToAct) 这期间产生的令牌restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))if restoreTokens <= 0 {return}// 推进桶的时间t, tokens := r.lim.advance(t)tokens += restoreTokensif burst := float64(r.lim.burst); tokens > burst {tokens = burst}r.lim.last = t// 归还tokenr.lim.tokens = tokens// 如果相等,说明后面没有新的token消费if r.timeToAct == r.lim.lastEvent {// 上一次的lastEventprevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))if !prevEvent.Before(t) {// 恢复lastEvent为上一次的lastEventr.lim.lastEvent = prevEvent}}
}

是bug吗

其中比较费解的是这一行:为啥要多减去这一部分

restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
  • r.TimeToAct:本次Reservation可以放行请求的时间,也是能使用令牌的时间
  • r.lim.lastEvent:令牌桶中,最近一次请求的TimeToAct

在归还令牌时,要减掉这段时间范围能产生的令牌数

对应于下面这个场景:

在这里插入图片描述

这行代码有注释:

// calculate tokens to restore
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
// after r was obtained. These tokens should not be restored.

但只是解释了在干啥:在r之后获取的令牌不应该被重新放回桶中

而没有解释为啥要这么干


有人在stackoverflow提问,归还所有持有的token是否应该改成restoreTokens = float64(r.tokens)

在这里插入图片描述

也有人在官方仓库提issue,举了这样一个例子:

func TestLimit(t *testing.T) {t0 := time.Now()t1 := time.Now().Add(100 * time.Millisecond)t2 := time.Now().Add(200 * time.Millisecond)t3 := time.Now().Add(300 * time.Millisecond)// 桶每秒产生10个令牌,容量=20l := NewLimiter(10, 20)l.ReserveN(t0, 15) // 桶里还剩 5 个 tokenfmt.Printf("%+v\n", l)// t1:时间过去100ms,桶里增加1个,变成6个// 消费10个,变成-4个r := l.ReserveN(t1, 10)// 此时timeToAct:500msfmt.Printf("%+v\n", l)// t2:时间过去100ms,桶里增加1个,变成-3个// 消费2个, 桶里变成-5个l.ReserveN(t2, 2)// 此时timeToAct:700msfmt.Printf("%+v\n", l)// t3: 时间过去100ms,桶里增加1个,变成-4个// r在t3取消,变成4(归还8个)个,按常理是变成6(归还10个)个r.CancelAt(t3)fmt.Printf("%+v\n", l)
}

最后一行:

  • 按常理来说,应该把所有令牌都归还,也就是归还10个,这样桶中的令牌数变成6
  • 实际上只归还了8个,桶中的令牌数变成4。就是那行令人费解的代码造成的

有人在issue下面给出解释:

在这里插入图片描述

意思就是:如果把保留的令牌数全部还回去的话,会造成在某一时刻突破令牌桶的速率限制

拿上面例子说明:

如果在t3时刻取消r时,如果把所有的令牌都归还了,也就是归还10个,此时桶中的令牌数变成6

那么再过400ms,在t2时刻获取的Reservation r2就可以开始执行,同时现在桶中有10个令牌!

如果此时有请求来获取这10个令牌,算上刚刚开始执行的r2,那就相当于同时有加起来需要消耗12个令牌的请求在执行!超过了最大容量10的限制


而按照源码中的逻辑扣减,在t3时刻取消r时,只归还8个,此时桶中的令牌数变成4

那么再过400ms,在t2时刻获取的Reservation r2就可以开始执行,同时现在桶中有8个令牌

如果此时有请求来获取这8个令牌,算上刚刚开始执行的r2,那就会同时加起来需要消耗10个令牌的请求在执行,刚好不超过桶中的最大容量的限制

也就是说,如果r后面有更新的请求rnew,rnew.TimeToAct赋值给了r.limit.lastEvent

那么从r.TimeToActr.limit.lastEvent之间能产生的令牌是不能归还的,而是要等时间流逝自然填充

这样当rnew可以执行时,桶中的令牌数 + rnew获取的令牌数才不会超过桶的容量


取消后无法唤醒其他请求

这个库有个小问题:如果请求A在请求B之前发生,都需要等待一段时间。如果任务A提前cancel了,使得桶中的令牌满足请求B的需求,请求B也不会被唤醒,而是等待其预定的时间到了再执行

例如下面的例子:

func TestBug(t *testing.T) {// 每100ms产生一个令牌,最大容量=10l := NewLimiter(10, 10)t1 := time.Now()// 先把桶中的初始令牌用完l.ReserveN(t1, 10)var wg sync.WaitGroupctx, cancel := context.WithTimeout(context.TODO(), time.Hour)defer cancel()wg.Add(1)wg.Add(2)go func() {defer wg.Done()// 如果要等,这个要等 1s才能执行,但是我们的 ctx 200ms 就会取消l.WaitN(ctx, 10)fmt.Printf("[1] cost: %s\n", time.Since(t1))}()go func() {defer wg.Done()// 模拟出现问题, 200ms就取消了time.Sleep(200 * time.Millisecond)cancel()}()time.Sleep(100 * time.Millisecond)go func() {defer wg.Done()// 正常情况下,这个要等 1.2 s 才能执行,// 但是我们前面都取消了,按理说在400ms时,桶中令牌数就够了,这个可以执行// 但实际上没有唤醒,等到1.2才执行ctx, cancel := context.WithTimeout(context.Background(), time.Hour)defer cancel()l.WaitN(ctx, 2)fmt.Printf("[2] cost: %s\n", time.Since(t1))}()wg.Wait()
}
  1. 时刻0ms:请求A获取10个token,需要等1000ms,桶中tokens = -10
  2. 时刻100ms:请求B获取2个token,需要等1200ms,桶中tokens = -11
  3. 时刻200ms:请求A取消,往桶中归还8个token,桶中tokens = -2
  4. 时刻400ms:理想情况下,此时请求B可以执行,因为桶中容量够了
  5. 时刻1.2s:实际上,请求B此时才可以执行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/57152.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java之继承抽象类用法实例(三十一)

简介&#xff1a; CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布&#xff1a;《Android系统多媒体进阶实战》&#x1f680; 优质专栏&#xff1a; Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a; 多媒体系统工程师系列【…

一次文件重写后,文件修改时间未发生变化的原因

昨天用python写了一个脚本&#xff0c;用于统计店铺的某些数据&#xff0c;然后将数据存储在文本里&#xff0c;定时每天早上执行。 昨天傍晚写完的&#xff0c;执行了一下&#xff0c;执行完成后&#xff0c;此时文本修改时间为2024-10-22 18:00。 早上来公司一看&#xff0…

git命令笔记(速查速查)

git命令功能总结 1.创建git的本地仓库2. 配置本地仓库(name和email地址)3. 工作区、版本库、暂存区、对象区3.1 add, commit3.2 打印提交日志3.2 修改文件 4.版本回退&#xff08;git reset&#xff09;5. 撤销修改&#xff08;在push之前撤销&#xff09;6.删除版本库中的文件…

SQL Injection | SQL 注入分类 —— 查询方式

关注这个漏洞的其他相关笔记&#xff1a;SQL 注入漏洞 - 学习手册-CSDN博客 在进行 SQL 注入攻击时&#xff0c;如果目标服务器的后端 SQL 查询不返回任何结果&#xff08;即无回显&#xff09;&#xff0c;我们可能需要采用 SQL 盲注技术来进行进一步的测试。盲注是一种在无法…

SSM框架学习(七、MyBatis-Plus高级用法:最优化持久层开发)

目录 一、MyBatis-Plus快速入门 1.简介 2.快速入门 二、MyBatis-Plus核心功能 1.基于Mapper接口CRUD &#xff08;1&#xff09;Insert 方法 &#xff08;2&#xff09;Delete方法 &#xff08;3&#xff09;Update 方法 &#xff08;4&#xff09;Select方法 2.基于Serv…

用PHP写一个EACO(drc20)写一个和狗狗币,比特币,avax, bnb,eth,sol,usdt,等全球前30大数字货币的兑换去中心化小程序。

创建一个简单的PHP小程序&#xff0c;用于EACO&#xff08;DRC20&#xff09;与全球前30大数字货币&#xff08;如狗狗币、比特币、AVAX、BNB、ETH、SOL、USDT等&#xff09;进行去中心化兑换的功能&#xff0c;需要集成加密货币的API接口来获取实时汇率和执行兑换操作。以下是…

Chrome DevTools 三: Performance 性能面板扩展—— 性能优化

Performance 性能 &#xff08;一&#xff09;性能指标 首次内容绘制 (First Contentful Paint&#xff0c;FCP)&#xff1a; 任意内容在页面上完成渲染的时间 最大内容绘制 (Largest Contentful Paint&#xff0c;LCP)&#xff1a; 最大内容在页面上完成渲染的时间 第一字节…

《中国结算全国股份转让系统—结算参与人数据接口规范》

《中国结算全国股份转让系统—结算参与人数据接口规范》 本文档在原《中国结算全国股份转让系统结算参与人数据接口规范&#xff08;Ver1.2&#xff09;》基础上&#xff0c;依据《关于通过中国结算缴纳身份验证费的业务说明》、《关于通过中国结算缴纳手机号码核查费的业务说明…

283.移动零

目录 题目解法解释&#xff1a; .reverse()怎么用的&#xff1f;Char 13: error: no matching function for call to reverse 什么是双指针&#xff1f;双指针的常见类型&#xff1a;总结&#xff1a; 题目 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末…

【Java函数篇】Java8中的Lambda表达式实战详解

文章标题 什么是lambda表达式Lambda表达式示例Lambda表达式特点更多实战案例场景1&#xff1a;使用 lambda 表达式迭代列表并对列表项执行某些操作场景2&#xff1a;使用 lambda 表达式在 Java 中创建并启动线程场景3&#xff1a;使用 lambda 表达式向 GUI 组件添加事件监听器 …

88.【C语言】文件操作(5)

目录 文件的随机读写 1.fseek函数 代码示例 运行结果 2.ftell函数 代码示例 运行结果 3.rewind函数 代码示例 运行结果 承接79.【C语言】文件操作(4)文章 文件的随机读写 1.fseek函数 声明:int fseek ( FILE * stream, long int offset, int origin ); 格式:fsee…

APM 3.0.0|二次元味很冲的B站音乐软件

APM是一款专为B站音频设计的第三方播放器&#xff0c;支持从B站获取音频内容&#xff0c;提供桌面小组件&#xff0c;多语言支持&#xff0c;以及针对Android系统的优化。下载安装APK后打开应用&#xff0c;登录B站账号&#xff0c;浏览并播放音频内容。 大小&#xff1a;73M …

13分+文章利用scRNA-Seq揭示地铁细颗粒物引起肺部炎症的分子机制

写在前面 人们乘坐地铁时&#xff0c;不可避免地在地铁站台上吸入细颗粒物&#xff08;PM2.5&#xff09;&#xff0c;但PM2.5对人体又有哪些危害呢&#xff0c;今天和大家分享一篇文章&#xff0c;题目为“单细胞转录组学揭示吸入地铁细颗粒物引起的肺部炎症”&#xff0c;作…

Android:加载三方应用的小部件到自己APP显示

两种方式&#xff1a; 1、自己加载小部件列表做选择要显示的小部件 2、调用系统的弹窗做选择要显示的小部件 直接贴代码&#xff1a; public class TempActivity extends FragmentActivity {private ActivityTempBinding viewBinding;private AppWidgetManager appWidgetMa…

私域卖货难点如何解决

明确了品牌卖货的本质&#xff0c;我们来看一下私域品牌的卖货难点如何解决。 一、产品层面 想要提高卖货效率&#xff0c;第一步就是打造产品竞争力。产品竞争力的打造主要在于两点&#xff1a;市场调研和定期更新迭代&#xff0c;其中定期更新迭代则是打造产品竞争力的核心环…

《AI生成式工具使用》之:AI文本生视频(二战!)

目录 背景说明及目标 尝试练手 1、豆包AI之图片生成 总结&#xff1a;豆包AI生成的图片&#xff0c;不太能看细节&#xff0c;涉及到中文的基本上不能细看都是类似乱码的东西&#xff0c;有明显的逻辑性问题&#xff08;比如不符合道路交规&#xff09;。需要根据生成的结果…

超越 React Query:探索更高效的数据请求策略

你好&#xff0c;开发者们&#xff01; 在前端开发的海洋中&#xff0c;我们常常遇到组件间通信的难题。你是否也曾为如何优雅地在组件间传递信息而头疼&#xff1f;今天&#xff0c;我想和大家分享一个让我眼前一亮的解决方案——使用 alova。 跨组件触发请求的挑战 想象一…

Java-继承与多态-上篇

关于类与对象&#xff0c;内容较多&#xff0c;我们分为两篇进行讲解&#xff1a; &#x1f4da; Java-继承与多态-上篇&#xff1a;———— <就是本篇> &#x1f4d5; 继承的概念与使用 &#x1f4d5; 父类成员访问 &#x1f4d5; super关键字 &#x1f4d5; supe…

Windows设置程序开机自启动的几种方法

1. 使用“启动”文件夹 Windows 有一个专门的“启动”文件夹&#xff0c;所有放在这个文件夹中的程序都会在系统启动时自动运行。 步骤&#xff1a; 按下 Win R&#xff0c;输入 shell:startup&#xff0c;并按下回车。 在弹出的启动文件夹中&#xff0c;将你想要开机自启动…

laravel 查询数据库

数据库准备 插入 三行 不同的数据 自行搭建 laravel 工程 参考 工程创建点击此处 laravel 配置 数据库信息 DB_CONNECTIONmysql #连接什么数据库 DB_HOST127.0.0.1 # 连接 哪个电脑的 ip &#xff08;决定 电脑 本机&#xff09; DB_PORT3306 # 端口 DB_DATABASEyanyu…