统一流控服务开源:基于.Net Core的流控服务 统一流
先前有一篇博文,梳理了流控服务的场景、业界做法和常用算法 统一流控服务开源-1:场景&业界做法&算法篇 最近完成了流控服务的开发,并在生产系统进行了大半年的验证,稳定可靠。今天整理一下核心设计和实现思路,开源到Github上,分享给大家 ? ? ?https://github.com/zhouguoqing/FlowControl ?一、令牌桶算法实现 ? 先回顾一下令牌桶算法示意图 ?? ?? ? 随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)?往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水), ? ? 如果桶已经满了就不再加了. 新请求来临时,?会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务. ? ? 令牌添加速度支持动态变化,实时控制处理的速率. ? 令牌桶有两个关键的属性:令牌桶容量(大小)和时间间隔, ? 有两个关键操作,从令牌桶中取Token;令牌桶定时的Reset重置。 ? 我们看TokenBucket类: using System; namespace CZ.FlowControl.Service { CZ.FlowControl.Spi; /// <summary> /// 令牌桶 </summary> public abstract class TokenBucket : IThrottleStrategy { protected long bucketTokenCapacity; private static readonly object syncRoot = new object(); ticksRefillInterval; nextRefillTime; //number of tokens in the bucket tokens; protected TokenBucket(long bucketTokenCapacity,long refillInterval,1)"> refillIntervalInMilliSeconds) { if (bucketTokenCapacity <= 0) throw new ArgumentOutOfRangeException("bucketTokenCapacity",bucket token capacity can not be negative"); if (refillInterval < refillIntervalRefill interval cannot be negativeif (refillIntervalInMilliSeconds <= refillIntervalInMilliSecondsRefill interval in milliseconds cannot be negative); this.bucketTokenCapacity = bucketTokenCapacity; ticksRefillInterval = TimeSpan.FromMilliseconds(refillInterval * refillIntervalInMilliSeconds).Ticks; } <summary> 是否流控 </summary> <param name="n"></param> <returns></returns> bool ShouldThrottle(long n = 1) { TimeSpan waitTime; return ShouldThrottle(n,1)">out waitTime); } bool ShouldThrottle(long n,1)"> TimeSpan waitTime) { if (n <= 0) nShould be positive integerlock (syncRoot) { UpdateTokens(); if (tokens < n) { var timeToIntervalEnd = nextRefillTime - SystemTime.UtcNow.Ticks; if (timeToIntervalEnd < waitTime); waitTime = TimeSpan.FromTicks(timeToIntervalEnd); return true; } tokens -= n; waitTime = TimeSpan.Zero; false; } } 更新令牌 </summary> void UpdateTokens(); return ShouldThrottle(1,1)"> waitTime); } CurrentTokenCount { get { (syncRoot) { UpdateTokens(); return tokens; } } } } } ?这个抽象类中,将UpdateToken作为抽象方法暴露出来,给实现类更多的灵活去控制令牌桶重置操作。基于此实现了“固定令牌桶”FixedTokenBucket /// 固定令牌桶 FixedTokenBucket : TokenBucket { public FixedTokenBucket(long maxTokens,1)"> refillIntervalInMilliSeconds) : base(maxTokens,refillInterval,refillIntervalInMilliSeconds) { } override UpdateTokens() { var currentTime = SystemTime.UtcNow.Ticks; if (currentTime < nextRefillTime) ; tokens = bucketTokenCapacity; nextRefillTime = currentTime + ticksRefillInterval; } } ? ?固定令牌桶在每次取Token时,都要执行方法ShouldThrottle。这个方法中: ? ?并发取Token是线程安全的,这个地方用了Lock控制,损失了一部分性能。同时每次获取可用Token的时候,都会实时Check一下是否需要到达Reset令牌桶的时间。 ? ?获取到可用令牌后,令牌桶中令牌的数量-1。如果没有足够的可用令牌,则返回等待到下次Reset令牌桶的时间。如下代码: ); lock (syncRoot) { UpdateTokens(); ; } tokens -=; } } ? ?以上就是令牌桶算法的实现。我们继续看漏桶算法。 ?二、漏桶算法实现 ? 首先回顾一下漏桶算法的原理: ??‘ ?? ? 水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率), ? ? 当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求, ? ? 可以看出漏桶算法能强行限制数据的传输速率. ? ? 有两个变量: ?
? ? ?漏桶抽象类:LeakTokenBucket,继承与令牌桶抽象父类 TokenBucket,说明了获取令牌(漏出令牌)在底层的方式是一致的,不一样的是重置令牌的方式(务必理解这一点) CZ.FlowControl.Service { 漏桶 LeakyTokenBucket : TokenBucket { stepTokens; ticksStepInterval; protected LeakyTokenBucket(int refillIntervalInMilliSeconds,long stepTokens,1)">long stepInterval,1)"> stepIntervalInMilliseconds) : this.stepTokens = stepTokens; if (stepInterval < stepIntervalStep interval cannot be negativeif (stepTokens < stepTokensStep tokens cannot be negativeif (stepIntervalInMilliseconds <= stepIntervalInMillisecondsStep interval in milliseconds cannot be negative); ticksStepInterval = TimeSpan.FromMilliseconds(stepInterval * stepIntervalInMilliseconds).Ticks; } } } ? ? 可以看出,漏桶是在令牌桶的基础上增加了二个重要的属性:这两个属性决定了重置令牌桶的方式 ? ? stepTokens:每间隔时间内漏的数量 ? ? ticksStepInterval:漏的间隔时间 ? ? 举个例子:TPS 100,即每秒漏出100个Token,stepTokens =100,?ticksStepInterval=1000ms ? ? 漏桶的具体实现有两种:空桶和满桶 ? ??StepDownTokenBucket 满桶:即一把将令牌桶填充满 漏桶(满桶) </summary> <remarks> StepDownLeakyTokenBucketStrategy resembles a bucket which has been filled with tokens at the beginning but subsequently leaks tokens at a fixed interval </remarks> StepDownTokenBucket : LeakyTokenBucket { public StepDownTokenBucket(int refillIntervalInMilliSeconds,1)">int stepIntervalInMilliseconds) : if (currentTime >= nextRefillTime) { set tokens to max tokens = bucketTokenCapacity; compute next refill time nextRefillTime = currentTime + ticksRefillInterval; ; } calculate max tokens possible till the end var timeToNextRefill = nextRefillTime - currentTime; var stepsToNextRefill = timeToNextRefill/ticksStepInterval; var maxPossibleTokens = stepsToNextRefill*stepTokens; if ((timeToNextRefill%ticksStepInterval) > 0) maxPossibleTokens +=if (maxPossibleTokens < tokens) tokens = maxPossibleTokens; } } } ? ?StepUpLeakyTokenBucket 空桶:即每次只将stepTokens个数的令牌放到桶中? ? 1 System; 2 3 CZ.FlowControl.Service 4 { 5 <summary> 6 漏桶(空桶) 7 </summary> 8 <remarks> 9 StepUpLeakyTokenBucketStrategy resemembles an empty bucket at the beginning but get filled will tokens over a fixed interval. 10 </remarks> 11 StepUpLeakyTokenBucket : LeakyTokenBucket 12 { 13 lastActivityTime; 14 15 public StepUpLeakyTokenBucket( stepIntervalInMilliseconds) 16 : 17 { 18 } 19 20 UpdateTokens() 21 22 SystemTime.UtcNow.Ticks; 23 24 nextRefillTime) 25 { 26 tokens = stepTokens; 27 28 lastActivityTime = currentTime; 29 nextRefillTime = currentTime + ticksRefillInterval; 30 31 ; 32 } 33 34 calculate tokens at current step 35 36 long elapsedTimeSinceLastActivity = currentTime -37 long elapsedStepsSinceLastActivity = elapsedTimeSinceLastActivity / ticksStepInterval; 38 39 tokens += (elapsedStepsSinceLastActivity*stepTokens); 40 41 if (tokens > bucketTokenCapacity) tokens = bucketTokenCapacity; 42 lastActivityTime =43 44 } 45 } ?三、流控服务封装 ? 第二章节,详细介绍了令牌桶和漏桶的具体实现。基于以上,要重点介绍接口:IThrottleStrategy:流控的具体方式 CZ.FlowControl.Spi { 流量控制算法策略 interface IThrottleStrategy { ); <param name="waitTime"></param> TimeSpan waitTime); 当前令牌个数 long CurrentTokenCount { ; } } } ? ? 有了这个流控方式接口后,我们还需要一个流控策略定义类:FlowControlStrategy ? ? 即定义具体的流控策略:以下是这个类的详细属性和成员:? 不仅定义了流控策略类型,还定义了流控的维度信息和流控阈值,这样流控就做成依赖注入的方式了!? System.Collections.Generic; System.Text; 流控策略 FlowControlStrategy { 标识 string ID { get; set; } 名称 string Name { 流控策略类型 public FlowControlStrategyType StrategyType { 流控阈值-Int long IntThreshold { 流控阈值-Double decimal DoubleThreshold { 时间区间跨度 public FlowControlTimespan TimeSpan { ; } private Dictionary<string,1)">string> flowControlConfigs; 流控维度信息 public Dictionary< FlowControlConfigs { if (flowControlConfigs == null) flowControlConfigs = new Dictionary<(); flowControlConfigs; } { flowControlConfigs = value; } } 描述 string Descriptions { 触发流控后是否直接拒绝请求 </summary> bool IsRefusedRequest { 创建时间 public DateTime CreateTime { 创建人 string Creator { 最后修改时间 public DateTime LastModifyTime { 最后修改人 string LastModifier { ; } } } ? ?同时,流控策略类型,我们抽象了一个枚举:FlowControlStrategyType ? ?支持3种流控策略:TPS、Sum(指定时间段内请求的次数),Delay延迟 流控策略类型枚举 enum FlowControlStrategyType { TPS控制策略 TPS, 总数控制策略 Sum,1)"> 延迟控制策略 Delay } } ? 面向每种流控策略类型,提供了一个对应的流控器,比如说TPS的流控器 TPSFlowController,内部使用了固定令牌桶算法
CZ.FlowControl.Spi; TPS流量控制器 TPSFlowController : IFlowController { public IThrottleStrategy InnerThrottleStrategy { ; } public FlowControlStrategy FlowControlStrategy { return InnerThrottleStrategy.ShouldThrottle(n,1)"> TPSFlowController(FlowControlStrategy strategy) { FlowControlStrategy = strategy; InnerThrottleStrategy = new FixedTokenBucket(strategy.IntThreshold,1)">1000); } } } ? Sum(指定时间段内请求的次数)流控器: ?? System.IO; System.Linq; 一段时间内合计值流量控制器 SumFlowController : IFlowController { SumFlowController(FlowControlStrategy strategy) { FlowControlStrategy = strategy; var refillInterval = GetTokenBucketRefillInterval(strategy); InnerThrottleStrategy = ); } GetTokenBucketRefillInterval(FlowControlStrategy strategy) { long refillInterval = ; switch (strategy.TimeSpan) { case FlowControlTimespan.Second: refillInterval = ; break; FlowControlTimespan.Minute: refillInterval = 60 FlowControlTimespan.Hour: refillInterval = 60 * FlowControlTimespan.Day: refillInterval = 24 * refillInterval; } } } ? 同时,通过一个创建者工厂,根据不同的流控策略,创建对应的流控器(做了一层缓存,性能更好): 流控策略工厂 FlowControllerFactory { static Dictionary< fcControllers; object syncObj = (); static FlowControllerFactory instance; private FlowControllerFactory() { fcControllers = (); } FlowControllerFactory GetInstance() { if (instance == ) { (syncObj) { ) { instance = new FlowControllerFactory(); } } } instance; } IFlowController GetOrCreateFlowController(FlowControlStrategy strategy) { if (strategy == new ArgumentNullException(FlowControllerFactory.GetOrCreateFlowController.strategyif (!fcControllers.ContainsKey(strategy.ID)) { fcControllers.ContainsKey(strategy.ID)) { var fcController = CreateFlowController(strategy); if (fcController != ) fcControllers.Add(strategy.ID,fcController); } } } if (fcControllers.ContainsKey(strategy.ID)) { var controller = fcControllers[strategy.ID]; controller; } IFlowController CreateFlowController(FlowControlStrategy strategy) { FlowControllerFactory.CreateFlowController.strategy); IFlowController controller = (strategy.StrategyType) { FlowControlStrategyType.TPS: controller = TPSFlowController(strategy); FlowControlStrategyType.Delay: controller = DelayFlowController(strategy); FlowControlStrategyType.Sum: controller = SumFlowController(strategy); default: controller; } } } ? ? ?有了流控策略定义、我们更进一步,继续封装了流控Facade服务,这样把流控的变化封装到内部。对外只提供流控服务接口,流控时动态传入流控策略和流控个数:FlowControlService ? ? CZ.FlowControl.Spi; System.Threading; 统一流控服务 FlowControlService { 流控 <param name="strategy">流控策略</param> <param name="count">请求次数</param> void FlowControl(FlowControlStrategy strategy,1)">int count = ) { FlowControllerFactory.GetInstance().GetOrCreateFlowController(strategy); TimeSpan waitTimespan = TimeSpan.Zero; var result = controller.ShouldThrottle(count,1)"> waitTimespan); (result) { if (strategy.IsRefusedRequest == false && waitTimespan != TimeSpan.Zero) { WaitForAvailable(strategy,controller,waitTimespan,count); } else (strategy.IsRefusedRequest) { new Exception(触发流控!); } } } 等待可用 <param name="controller">流控器<param name="waitTimespan">等待时间void WaitForAvailable(FlowControlStrategy strategy,IFlowController controller,TimeSpan waitTimespan,1)"> count) { var timespan = waitTimespan; if (strategy.StrategyType == FlowControlStrategyType.Delay) { Thread.Sleep(timespan); while (controller.ShouldThrottle(count,1)"> timespan)) { Thread.Sleep(timespan); } } } } ? 以上,统一流控服务完成了第一个版本的封装。接下来我们看示例代码 ?四、示例代码 ? ? 先安装Nuget: Install-Package CZ.FlowControl.Service -Version 1.0.0 ? ? ?? ? ? ? ? 是不是很简单。 ? ? 大家如果希望了解详细的代码,请参考这个项目的GitHub地址: ? ??https://github.com/zhouguoqing/FlowControl ? ? 同时也欢迎大家一起改进完善。 ? ?? ? 周国庆 2019/8/9 ? ?? (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- asp.net-web-api – 更好地总是在Web Api中返回HttpRespons
- asp.net-core – “如果从deploy文件夹运行,则需要”库’ho
- asp.net-mvc-5 – 什么是XsrfKey用于,我应该将XsrfId设置为
- ASP.NET MVC中使用区域时的配置错误4
- asp.net – 什么是asp:DropDownList客户端事件?
- 在ASP.NET应用程序中实现多语言的最佳方式
- asp.net-mvc – 使用AD的ASP.NET MVC表单Auth在本地工作但在
- asp.net – 如何在AJAX中显示状态描述?
- asp.net – 为Umbraco预订建议
- asp.net-mvc – EC2上的ASP.net MVC