一文聊聊go语言中的限流漏桶和令牌桶库

本篇文章带大家聊聊go语言中的限流漏桶和令牌桶库,介绍令牌桶和漏桶的实现原理以及在实际项目中简单应用。

一文聊聊go语言中的限流漏桶和令牌桶库

为什么需要限流中间件?

在大数据量高并发访问时,经常会出现服务或接口面对大量的请求而导致数据库崩溃的情况,甚至引发连锁反映导致整个系统崩溃。或者有人恶意攻击网站,大量的无用请求出现会导致缓存穿透的情况出现。使用限流中间件可以在短时间内对请求进行限制数量,起到降级的作用,从而保障了网站的安全性。

应对大量并发请求的策略?

使用消息中间件进行统一限制(降速)

使用限流方案将多余请求返回(限流)

立即学习“go语言免费学习笔记(深入)”;

升级服务器

缓存(但仍然有缓存穿透等危险)

等等

可以看出在代码已经无法提升的情况下,只能去提升硬件水平。或者改动架构再加一层!也可以使用消息中间件统一处理。而结合看来,限流方案是一种既不需要大幅改动也不需要高额开销的策略。

常见的限流方案

令牌桶算法

漏桶算法

滑动窗口算法

等等

漏桶

引入ratelimit库

go get -u go.uber.org/ratelimit

库函数源代码

 // New returns a Limiter that will limit to the given RPS. func New(rate int, opts ...Option) Limiter {     return newAtomicBased(rate, opts...) }  // newAtomicBased returns a new atomic based limiter. func newAtomicBased(rate int, opts ...Option) *atomicLimiter {     // TODO consider moving config building to the implementation     // independent code.     config := buildConfig(opts)     perRequest := config.per / time.Duration(rate)     l := &atomicLimiter{         perRequest: perRequest,         maxSlack:   -1 * time.Duration(config.slack) * perRequest,         clock:      config.clock,     }      initialState := state{         last:     time.Time{},         sleepFor: 0,     }     atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))     return l }

登录后复制

该函数使用了函数选项模式多个结构体对象进行初始化

根据传入的值来初始化一个桶结构体 rate 为int 传参 。

初始化过程中包括了

每一滴水需要的时间 perquest = config.per / time.Duration(rate)maxSlack 宽松度(宽松度为负值)-1 * time.Duration(config.slack) * perRequest 松紧度是用来规范等待时间的

 // Clock is the minimum necessary interface to instantiate a rate limiter with // a clock or mock clock, compatible with clocks created using // github.com/andres-erbsen/clock. type Clock interface {    Now() time.Time    Sleep(time.Duration) }

登录后复制

同时还需要结构体Clock来记录当前请求的时间now和此刻的请求所需要花费等待的时间sleep

 type state struct {    last     time.Time    sleepFor time.Duration }

登录后复制

state 主要用来记录上次执行的时间以及当前执行请求需要花费等待的时间(作为中间状态记录)

最重要的Take逻辑

 func (t *atomicLimiter) Take() time.Time {    var (       newState state       taken    bool       interval time.Duration    )    for !taken {       now := t.clock.Now()        previousStatePointer := atomic.LoadPointer(&t.state)       oldState := (*state)(previousStatePointer)        newState = state{          last:     now,          sleepFor: oldState.sleepFor,       }       if oldState.last.IsZero() {          taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))          continue       }       // 计算是否需要进行等待取水操作       newState.sleepFor += t.perRequest(每两滴水之间的间隔时间) - now.Sub(oldState.last)(当前时间与上次取水时间的间隔)                // 如果等待取水时间特别小,就需要松紧度进行维护       if newState.sleepFor  0 {          newState.last = newState.last.Add(newState.sleepFor)          interval, newState.sleepFor = newState.sleepFor, 0       }       taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))    }    t.clock.Sleep(interval)    // 最后返回需要等待的时间     return newState.last }

登录后复制

实现一个Take方法

该Take方法会进行原子性操作(可以理解为加锁和解锁),在大量并发请求下仍可以保证正常使用。

记录下当前的时间 now := t.clock.Now()

oldState.last.IsZero()判断是不是第一次取水,如果是就直接将state结构体中的值进行返回。而这个结构体中初始化了上次执行时间,如果是第一次取水就作为当前时间直接传参。

如果 newState.sleepFor 非常小,就会出现问题,因此需要借助宽松度,一旦这个最小值比宽松度小,就用宽松度对取水时间进行维护。

如果newState.sleepFor > 0 就直接更新结构体中上次执行时间newState.last = newState.last.Add(newState.sleepFor)并记录需要等待的时间interval, newState.sleepFor = newState.sleepFor, 0。

如果允许取水和等待操作,那就说明没有发生并发竞争的情况,就模拟睡眠时间t.clock.Sleep(interval)。然后将取水的目标时间进行返回,由服务端代码来判断是否打回响应或者等待该时间后继续响应。

t.clock.Sleep(interval)

 func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }

登录后复制

实际上在一个请求来的时候,限流器就会进行睡眠对应的时间,并在睡眠后将最新取水时间返回。

实际应用(使用Gin框架)

 func ratelimit1() func(ctx *gin.Context) {     r1 := rate1.New(100)     return func(ctx *gin.Context) {         now := time.Now()         //  Take 返回的是一个 time.Duration的时间         if r1.Take().Sub(now) > 0 {             // 返回的时间比当前的时间还大,说明需要进行等待             // 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行             // 如果不需要等待请求时间,就直接进行Abort 然后返回             response(ctx, http.StatusRequestTimeout, "rate1 limit...")             fmt.Println("rate1 limit...")             ctx.Abort()             return         }         // 放行         ctx.Next()     } }

登录后复制

这里你可以进行选择是否返回。因为Take一定会执行sleep函数,所以当执行take结束后表示当前请求已经接到了水。当前演示使用第一种情况。

如果你的业务要求响应不允许进行等待。那么可以在该请求接完水之后然后,如上例。

如果你的业务允许响应等待,那么该请求等待对应的接水时间后进行下一步。具体代码就是将if中的内容直接忽略。(建议使用)

测试代码

这里定义了一个响应函数和一个handler函数方便测试

 func response(c *gin.Context, code int, info any) {    c.JSON(code, info) }  func pingHandler(c *gin.Context) {    response(c, 200, "ping ok~") }

登录后复制

执行go test -run=Run -v先开启一个web服务

 func TestRun(t *testing.T) {    r := gin.Default()     r.GET("/ping1", ratelimit1(), pingHandler)    r.GET("/ping2", ratelimit2(), helloHandler)     _ = r.Run(":4399") }

登录后复制

使用接口压力测试工具go-wrk进行测试->tsliwowicz/go-wrk: go-wrk)

golang引入install版本可以直接通过go install github.com/tsliwowicz/go-wrk@latest下载

使用帮助

    Usage: go-wrk      Options:     -H       Header to add to each request (you can define multiple -H flags) (Default )     -M       HTTP method (Default GET)     -T       Socket/request timeout in ms (Default 1000)     -body    request body string or @filename (Default )     -c       Number of goroutines to use (concurrent connections) (Default 10)     -ca      CA file to verify peer against (SSL/TLS) (Default )     -cert    CA certificate file to verify peer against (SSL/TLS) (Default )     -d       Duration of test in seconds (Default 10)     -f       Playback file name (Default )     -help    Print help (Default false)     -host    Host Header (Default )     -http    Use HTTP/2 (Default true)     -key     Private key file name (SSL/TLS (Default )     -no-c    Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false)     -no-ka   Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false)     -no-vr   Skip verifying SSL certificate of the server (Default false)     -redir   Allow Redirects (Default false)     -v       Print version details (Default false)

登录后复制

-t 8个线程 -c 400个连接 -n 模拟100次请求 -d 替换-n 表示连接时间

输入go-wrk -t=8 -c=400 -n=100 http://127.0.0.1:4399/ping1

可以稍微等待一下水流积攒(压测速度过快)。

image.png可以看出,89个请求全部返回。也就是说在一段请求高峰期,不会有请求进行响应。因此我认为既然内部已经睡眠,那么就也就应该对请求放行处理。

令牌桶

引入ratelimit库

go get -u github.com/juju/ratelimit

初始化

 // NewBucket returns a new token bucket that fills at the // rate of one token every fillInterval, up to the given // maximum capacity. Both arguments must be // positive. The bucket is initially full. func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {    return NewBucketWithClock(fillInterval, capacity, nil) }  // NewBucketWithClock is identical to NewBucket but injects a testable clock // interface. func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {    return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock) }

登录后复制

进行Bucket桶的初始化。

 func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {    if clock == nil {       clock = realClock{}    }     // 填充速率    if fillInterval  0")    }     // 最大令牌容量    if capacity  0")    }     // 单次令牌生成量    if quantum  0")    }    return &Bucket{       clock:           clock,       startTime:       clock.Now(),       latestTick:      0,       fillInterval:    fillInterval,       capacity:        capacity,       quantum:         quantum,       availableTokens: capacity,    } }

登录后复制

令牌桶初始化过程,初始化结构体 fillInterval(填充速率) cap(最大令牌量) quannum(每次令牌生成量)。

如果三个变量有一个小于或者等于0的话直接进行报错返回。在最开始就将当前令牌数初始化为最大容量

调用

 // TakeAvailable takes up to count immediately available tokens from the // bucket. It returns the number of tokens removed, or zero if there are // no available tokens. It does not block. func (tb *Bucket) TakeAvailable(count int64) int64 {    tb.mu.Lock()    defer tb.mu.Unlock()    return tb.takeAvailable(tb.clock.Now(), count) }

登录后复制

调用TakeAvailable函数,传入参数为需要取出的令牌数量,返回参数是实际能够取出的令牌数量。

内部实现

 func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {    // 如果需要取出的令牌数小于等于零,那么就返回0个令牌     if count  tb.availableTokens {       count = tb.availableTokens    }     // 调整令牌数    tb.availableTokens -= count    return count }

登录后复制

如果需要取出的令牌数小于等于零,那么就返回0个令牌

根据时间对当前桶中令牌数进行计算

计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌

如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数

调整令牌数

调整令牌

 func (tb *Bucket) adjustavailableTokens(tick int64) {    lastTick := tb.latestTick    tb.latestTick = tick     // 如果当前令牌数大于最大等于容量,直接返回最大容量    if tb.availableTokens >= tb.capacity {       return    }     // 当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)    tb.availableTokens += (tick - lastTick) * tb.quantum     // 如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数    if tb.availableTokens > tb.capacity {       tb.availableTokens = tb.capacity    }    return }

登录后复制

如果当前令牌数大于最大等于容量,直接返回最大容量

当前令牌数 += (当前时间 – 上次取出令牌数的时间) * quannum(每次生成令牌量)

如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数

实现原理

加锁 defer 解锁

判断count(想要取出的令牌数) 是否小于等于 0,如果是直接返回 0

调用函数adjustTokens 获取可用的令牌数量

如果当前可以取出的令牌数小于等于0 直接返回 0

如果当前可以取出的令牌数小于当前想要取出的令牌数(count) count = 当前可以取出的令牌数

当前的令牌数 -= 取出的令牌数 (count)

返回 count(可以取出的令牌数)

额外介绍

take函数,能够返回等待时间和布尔值,允许欠账,没有令牌也可以取出。

func (tb *Bucket) Take(count int64) time.Duration

takeMaxDuration函数,可以根据最大等待时间来进行判断。

func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)

因为他们内部的实现都基于令牌调整,我这里不做过多介绍,如果感兴趣可以自行研究一下。

测试

 func ratelimit2() func(ctx *gin.Context) {     // 生成速率 最大容量     r2 := rate2.NewBucket(time.Second, 200)     return func(ctx *gin.Context) {         //r2.Take() // 允许欠账,令牌不够也可以接收请求         if r2.TakeAvailable(1) == 1 {             // 如果想要取出1个令牌并且能够取出,就放行             ctx.Next()             return         }         response(ctx, http.StatusRequestTimeout, "rate2 limit...")         ctx.Abort()         return     } }

登录后复制

image.png压测速度过于快速,在实际过程中可以根据调整令牌生成速率来进行具体限流!

小结

令牌桶可以允许自己判断请求是否继续,内部不会进行睡眠操作。而漏桶需要进行睡眠,并没有提供方法让程序员进行判断是否放行。

【相关推荐:Go视频教程、编程教学】

以上就是一文聊聊go语言中的限流漏桶和令牌桶库的详细内容,更多请关注【创想鸟】其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至253000106@qq.com举报,一经查实,本站将立刻删除。

发布者:PHP中文网,转转请注明出处:https://www.chuangxiangniao.com/p/2414504.html

(0)
上一篇 2025年3月3日 00:40:02
下一篇 2025年2月26日 12:01:47

AD推荐 黄金广告位招租... 更多推荐

相关推荐

  • go Realize是什么

    Realize是一个Go工具,专注于加快和改善开发人员的工作流程,自动化用户的工作流程,集成第三方的其他工具,定义自定义cli命令,并在每次更改时重新加载项目,而不必停止编写代码;执行“go get github.com/tockins/r…

    2025年3月3日
    200
  • go语言eof错误是什么

    在go语言中,eof是指文件结尾错误,是Go语言中最重要的错误变量,存在于io包中,用于表示输入流的结尾。因为每个文件都有一个结尾,所以“io.EOF”很多时候并不能算是一个错误,它更重要的是表示一个输入流结束了。 本教程操作环境:wind…

    2025年3月3日
    200
  • 一文浅析Golang中的nil和零值

    作为一个长期从事Java开发的人员,我痴迷于null检查和处理null值。在golang中,故事有些不同。在这篇文章中,我将尝试描述在golang中如何使用nil和零值。 非空和空类型 在go中类型可以是空或非空。 非空类型永远不能为nil…

    2025年3月3日
    200
  • GO goutil发布了v0.6.4版!更新速览!

    本篇文章带大家聊聊go工具库goutil的最新更新情况,介绍了gookit/goutil go常用功能的扩展工具库,都包含哪些知识,附更新日志,希望对大家有所帮助! gookit/goutil Go 常用功能的扩展工具库。包含:数字,字符串…

    2025年3月3日
    200
  • Go语言怎么判断结构体是否存在某方法?两种方式介绍

    go语言怎么判断结构体是否存在某方法?下面本篇文章给大家介绍一下golang判断结构体是否存在某方法的两种方式(附代码示例),希望对大家有所帮助! go 有时需要判断某个结构体是不是有某个方法,但是可能突然就一脸茫然,go 也可以像 php…

    2025年3月3日
    200
  • go语言是什么编程语言

    go语言是是Google开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。Go是一种过程编程语言,可用于快速机器代码编译;它提供了并发机制,可以轻松开发多核和联网的机器级程序;它提供对接口和类型嵌入的支持。 本教程操作环境…

    2025年3月3日
    200
  • golang怎么实现文件监控

    在golang中,可以利用fsnotify来实现文件监控。fsnotify是go语言跨平台文件系统监控工具,实现了一个基于channel的、跨平台的实时监听接口;golang通过fsnotify可监控文件,并通过文件变化重启程序。 本教程操…

    2025年3月3日 编程技术
    200
  • 详解怎么使用Golang爬取必应壁纸

    做爬虫不用说,就是用python就好,一个requests包走天下。但是呢,听说golang中内置的http包非常牛逼,咱就是说不得整点活,也刚好学习学习新东西,复习下http协议的请求和响应相关的知识点。话不多说,咱直接开整 本文章爬下必…

    2025年3月3日 编程技术
    200
  • go语言怎么获取类型信息

    在go语言中,可以通过反射来获取类型信息;只需要调用reflect包的TypeOf()函数即可。方法:1、使用“reflect.TypeOf()”函数取得指定变量的类型对象;2、使用Name()和Kind()获取类型对象的类型信息,语法“类…

    2025年3月3日
    200
  • 聊聊Go怎么实现SSE?需要注意什么?

    本篇文章给大家带来了关于go的相关知识,其中主要跟大家聊一聊go用什么方式实现sse,以及需要注意的事项,感兴趣的朋友下面一起来看一下吧,希望对大家有帮助。 一、服务端代码 package mainimport (   “fmt”   “n…

    2025年3月3日
    200

发表回复

登录后才能评论