加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

golang.org/x/time/rate 使用说明

发布时间:2020-12-16 19:11:04 所属栏目:大数据 来源:网络整理
导读:接口介绍 type Limiter func NewLimiter func Limiter Allow func Limiter AllowN func Limiter Reserve func Limiter ReserveN func Limiter Wait func Limiter WaitN 测试 AllowN 测试ReserveN 官方链接 本人简书同步地址 接口介绍 type Limiter type Limi
    • 接口介绍
    • type Limiter
    • func NewLimiter
    • func Limiter Allow
    • func Limiter AllowN
    • func Limiter Reserve
    • func Limiter ReserveN
    • func Limiter Wait
    • func Limiter WaitN
      • 测试 AllowN
      • 测试ReserveN

官方链接
本人简书同步地址

接口介绍

type Limiter

type Limiter struct {
    // contains filtered or unexported fields
}

Limter限制时间的发生频率,采用令牌池的算法实现。这个池子一开始容量为b,装满b个令牌,然后每秒往里面填充r个令牌。
由于令牌池中最多有b个令牌,所以一次最多只能允许b个事件发生,一个事件花费掉一个令牌。

Limter提供三中主要的函数 Allow,Reserve,and Wait. 大部分时候使用Wait。

func NewLimiter

func NewLimiter(r Limit,b int) *Limiter

NewLimiter 返回一个新的Limiter。

func (*Limiter) [Allow]

func (lim *Limiter) Allow() bool

Allow 是函数 AllowN(time.Now(),1)的简化函数。

func (*Limiter) AllowN

func (lim *Limiter) AllowN(now time.Time,n int) bool

AllowN标识在时间now的时候,n个事件是否可以同时发生(也意思就是now的时候是否可以从令牌池中取n个令牌)。如果你需要在事件超出频率的时候丢弃或跳过事件,就使用AllowN,否则使用Reserve或Wait.

func (*Limiter) Reserve

func (lim *Limiter) Reserve() *Reservation

Reserve是ReserveN(time.Now(),1).的简化形式。

func (*Limiter) ReserveN

func (lim *Limiter) ReserveN(now time.Time,n int) *Reservation

ReserveN 返回对象Reservation ,标识调用者需要等多久才能等到n个事件发生(意思就是等多久令牌池中至少含有n个令牌)。

如果ReserveN 传入的n大于令牌池的容量b,那么返回false.
使用样例如下:

r := lim.ReserveN(time.Now(),1)
if !r.OK() {
  // Not allowed to act! Did you remember to set lim.burst to be > 0 ?我只要1个事件发生仍然返回false,是不是b设置为了0?
  return
}
time.Sleep(r.Delay())
Act()

如果希望根据频率限制等待和降低事件发生的速度而不丢掉事件,就使用这个方法。
我认为这里要表达的意思就是如果事件发生的频率是可以由调用者控制的话,可以用ReserveN 来控制事件发生的速度而不丢掉事件。如果要使用context的截止日期或cancel方法的话,使用WaitN。

func (*Limiter) Wait

func (lim *Limiter) Wait(ctx context.Context) (err error)

Wait是WaitN(ctx,1)的简化形式。

func (*Limiter) WaitN

func (lim *Limiter) WaitN(ctx context.Context,n int) (err error)

WaitN 阻塞当前直到lim允许n个事件的发生。
- 如果n超过了令牌池的容量大小则报错。
- 如果Context被取消了则报错。
- 如果lim的等待时间超过了Context的超时时间则报错。

测试 AllowN

package main

import (
    "os"
    "time"

    "golang.org/x/time/rate"

    "github.com/op/go-logging"
)

var log = logging.MustGetLogger("example")

// Example format string. Everything except the message has a custom color
// which is dependent on the log level. Many fields have a custom output
// formatting too,eg. the time returns the hour down to the milli second.
var format = logging.MustStringFormatter(
    `%{color}%{time:15:04:05.000} %{shortfunc} ? %{level:.4s} %{id:03x}%{color:reset} %{message}`,)

func main() {

    backend1 := logging.NewLogBackend(os.Stderr,"",0)
    backend2 := logging.NewLogBackend(os.Stderr,0)
    backend2Formatter := logging.NewBackendFormatter(backend2,format)
    backend1Leveled := logging.AddModuleLevel(backend1)
    backend1Leveled.SetLevel(logging.ERROR,"")
    logging.SetBackend(backend1Leveled,backend2Formatter)

    r := rate.Every(1)
    limit := rate.NewLimiter(r,10)
    for {
        if limit.AllowN(time.Now(),8) {
            log.Info("log:event happen")
        } else {
            log.Info("log:event not allow")
        }

    }

}

测试ReserveN

参考YY哥

package main

import (
    "bytes"
    "fmt"
    "io"
    "time"

    "golang.org/x/time/rate"
)

type reader struct {
    r      io.Reader
    limiter *rate.Limiter
}

// Reader returns a reader that is rate limited by
// the given token bucket. Each token in the bucket
// represents one byte.
func NewReader(r io.Reader,l *rate.Limiter) io.Reader {
    return &reader{
        r:      r,limiter:l,}
}

func (r *reader) Read(buf []byte) (int,error) {
    n,err := r.r.Read(buf)
    if n <= 0 {
        return n,err
    }

    now := time.Now()
    rv := r.limiter.ReserveN(now,n)
    if !rv.OK() {
        return 0,fmt.Errorf("Exceeds limiter's burst")
    }
    delay := rv.DelayFrom(now)
    //fmt.Printf("Read %d bytes,delay %dn",n,delay)
    time.Sleep(delay)
    return n,err
}

func main() {
    // Source holding 1MB
    src := bytes.NewReader(make([]byte, 1024*1024))
    // Destination
    dst := &bytes.Buffer{}

    // Bucket adding 100KB every second,holding max 100KB
    limit := rate.NewLimiter(100*1024, 100*1024)

    start := time.Now()

    buf := make([]byte, 10*1024)
    // Copy source to destination,but wrap our reader with rate limited one
    //io.CopyBuffer(dst,NewReader(src,limit),buf)
    r := NewReader(src,limit)
    for{
        if n,err := r.Read(buf); err == nil {
            dst.Write(buf[0:n])
        }else{
            break
        }
    }

    fmt.Printf("Copied %d bytes in %sn",dst.Len(),time.Since(start))
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读