統一流控服務開源:基於.Net Core的流控服務

先前有一篇博文,梳理了流控服務的場景、業界作法和經常使用算法html

統一流控服務開源-1:場景&業界作法&算法篇git

最近完成了流控服務的開發,並在生產系統進行了大半年的驗證,穩定可靠。今天整理一下核心設計和實現思路,開源到Github上,分享給你們github

     https://github.com/zhouguoqing/FlowControl算法

 1、令牌桶算法實現緩存

  先回顧一下令牌桶算法示意圖安全

  

  

  隨着時間流逝,系統會按恆定1/QPS時間間隔(若是QPS=100,則間隔是10ms) 往桶裏加入Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),併發

 

  若是桶已經滿了就再也不加了. 新請求來臨時, 會各自拿走一個Token,若是沒有Token可拿了就阻塞或者拒絕服務.ide

 

  令牌添加速度支持動態變化,實時控制處理的速率.post

  令牌桶有兩個關鍵的屬性:令牌桶容量(大小)和時間間隔,性能

  有兩個關鍵操做,從令牌桶中取Token;令牌桶定時的Reset重置。

  咱們看TokenBucket類:

using System;

namespace CZ.FlowControl.Service
{
    using CZ.FlowControl.Spi;
    /// <summary>
    /// 令牌桶
    /// </summary>
    public abstract class TokenBucket : IThrottleStrategy
    {
        protected long bucketTokenCapacity;
        private static readonly object syncRoot = new object();
        protected readonly long ticksRefillInterval;
        protected long nextRefillTime;

        //number of tokens in the bucket
        protected long tokens;

        protected TokenBucket(long bucketTokenCapacity, long refillInterval, long refillIntervalInMilliSeconds)
        {
            if (bucketTokenCapacity <= 0)
                throw new ArgumentOutOfRangeException("bucketTokenCapacity", "bucket token capacity can not be negative");
            if (refillInterval < 0)
                throw new ArgumentOutOfRangeException("refillInterval", "Refill interval cannot be negative");
            if (refillIntervalInMilliSeconds <= 0)
                throw new ArgumentOutOfRangeException("refillIntervalInMilliSeconds", "Refill interval in milliseconds cannot be negative");

            this.bucketTokenCapacity = bucketTokenCapacity;
            ticksRefillInterval = TimeSpan.FromMilliseconds(refillInterval * refillIntervalInMilliSeconds).Ticks;
        }

        /// <summary>
        /// 是否流控
        /// </summary>
        /// <param name="n"></param>
        /// <returns></returns>
        public bool ShouldThrottle(long n = 1)
        {
            TimeSpan waitTime;
            return ShouldThrottle(n, out waitTime);
        }
        public bool ShouldThrottle(long n, out TimeSpan waitTime)
        {
            if (n <= 0) throw new ArgumentOutOfRangeException("n", "Should be positive integer");

            lock (syncRoot)
            {
                UpdateTokens();
                if (tokens < n)
                {
                    var timeToIntervalEnd = nextRefillTime - SystemTime.UtcNow.Ticks;
                    if (timeToIntervalEnd < 0) return ShouldThrottle(n, out waitTime);

                    waitTime = TimeSpan.FromTicks(timeToIntervalEnd);
                    return true;
                }
                tokens -= n;

                waitTime = TimeSpan.Zero;
                return false;
            }
        }

        /// <summary>
        /// 更新令牌
        /// </summary>
        protected abstract void UpdateTokens();

        public bool ShouldThrottle(out TimeSpan waitTime)
        {
            return ShouldThrottle(1, out waitTime);
        }

        public long CurrentTokenCount
        {
            get
            {
                lock (syncRoot)
                {
                    UpdateTokens();
                    return tokens;
                }
            }
        }
    }
}

 這個抽象類中,將UpdateToken做爲抽象方法暴露出來,給實現類更多的靈活去控制令牌桶重置操做。基於此實現了「固定令牌桶」FixedTokenBucket

    /// <summary>
    /// 固定令牌桶
    /// </summary>
    class FixedTokenBucket : TokenBucket
    {
        public FixedTokenBucket(long maxTokens, long refillInterval, long refillIntervalInMilliSeconds)
            : base(maxTokens, refillInterval, refillIntervalInMilliSeconds)
        {
        }

        protected override void UpdateTokens()
        {
            var currentTime = SystemTime.UtcNow.Ticks;

            if (currentTime < nextRefillTime)
                return;

            tokens = bucketTokenCapacity;
            nextRefillTime = currentTime + ticksRefillInterval;
        }
    }

   固定令牌桶在每次取Token時,都要執行方法ShouldThrottle。這個方法中:

   併發取Token是線程安全的,這個地方用了Lock控制,損失了一部分性能。同時每次獲取可用Token的時候,都會實時Check一下是否須要到達Reset令牌桶的時間。

   獲取到可用令牌後,令牌桶中令牌的數量-1。若是沒有足夠的可用令牌,則返回等待到下次Reset令牌桶的時間。以下代碼:

        public bool ShouldThrottle(long n, out TimeSpan waitTime)
        {
            if (n <= 0) throw new ArgumentOutOfRangeException("n", "Should be positive integer");

            lock (syncRoot)
            {
                UpdateTokens();
                if (tokens < n)
                {
                    var timeToIntervalEnd = nextRefillTime - SystemTime.UtcNow.Ticks;
                    if (timeToIntervalEnd < 0) return ShouldThrottle(n, out waitTime);

                    waitTime = TimeSpan.FromTicks(timeToIntervalEnd);
                    return true;
                }
                tokens -= n;

                waitTime = TimeSpan.Zero;
                return false;
            }
        }

   以上就是令牌桶算法的實現。咱們繼續看漏桶算法。

 2、漏桶算法實現

  首先回顧一下漏桶算法的原理:

  

  

  水(請求)先進入到漏桶裏,漏桶以必定的速度出水(接口有響應速率),

 

  當水流入速度過大會直接溢出(訪問頻率超過接口響應速率), 而後就拒絕請求,

 

  能夠看出漏桶算法能強行限制數據的傳輸速率.

 

  有兩個變量:

 

  • 一個是桶的大小,支持流量突發增多時能夠存多少的水(burst),
  • 另外一個是水桶漏洞的大小(rate)。

 

   漏桶抽象類:LeakTokenBucket,繼承與令牌桶抽象父類 TokenBucket,說明了獲取令牌(漏出令牌)在底層的方式是一致的,不同的是重置令牌的方式(務必理解這一點)

using System;

namespace CZ.FlowControl.Service
{
    /// <summary>
    /// 漏桶
    /// </summary>
    abstract class LeakyTokenBucket : TokenBucket
    {
        protected readonly long stepTokens;
        protected long ticksStepInterval;

        protected LeakyTokenBucket(long maxTokens, long refillInterval, int refillIntervalInMilliSeconds, 
            long stepTokens, long stepInterval, int stepIntervalInMilliseconds)
            : base(maxTokens, refillInterval, refillIntervalInMilliSeconds)
        {
            this.stepTokens = stepTokens;
            if (stepInterval < 0) throw new ArgumentOutOfRangeException("stepInterval", "Step interval cannot be negative");
            if (stepTokens < 0) throw new ArgumentOutOfRangeException("stepTokens", "Step tokens cannot be negative");
            if (stepIntervalInMilliseconds <= 0) throw new ArgumentOutOfRangeException("stepIntervalInMilliseconds", "Step interval in milliseconds cannot be negative");

            ticksStepInterval = TimeSpan.FromMilliseconds(stepInterval * stepIntervalInMilliseconds).Ticks;
        }
    }
}

    能夠看出,漏桶是在令牌桶的基礎上增長了二個重要的屬性:這兩個屬性決定了重置令牌桶的方式

    stepTokens:每間隔時間內漏的數量

    ticksStepInterval:漏的間隔時間

    舉個例子:TPS 100,即每秒漏出100個Token,stepTokens =100, ticksStepInterval=1000ms

    漏桶的具體實現有兩種:空桶和滿桶

    StepDownTokenBucket 滿桶:即一把將令牌桶填充滿

using System;

namespace CZ.FlowControl.Service
{
    /// <summary>
    /// 漏桶(滿桶)
    /// </summary>
    /// <remarks>
    /// StepDownLeakyTokenBucketStrategy resembles a bucket which has been filled with tokens at the beginning but subsequently leaks tokens at a fixed interval
    /// </remarks>
    class StepDownTokenBucket : LeakyTokenBucket
    {
        public StepDownTokenBucket(long maxTokens, long refillInterval, int refillIntervalInMilliSeconds, long stepTokens, long stepInterval, int stepIntervalInMilliseconds) : base(maxTokens, refillInterval, refillIntervalInMilliSeconds, stepTokens, stepInterval, stepIntervalInMilliseconds)
        {
        }

        protected override void UpdateTokens()
        {
            var currentTime = SystemTime.UtcNow.Ticks;

            if (currentTime >= nextRefillTime)
            {
                //set tokens to max
                tokens = bucketTokenCapacity;

                //compute next refill time
                nextRefillTime = currentTime + ticksRefillInterval;
                return;
            }

            //calculate max tokens possible till the end
            var timeToNextRefill = nextRefillTime - currentTime;
            var stepsToNextRefill = timeToNextRefill/ticksStepInterval;

            var maxPossibleTokens = stepsToNextRefill*stepTokens;

            if ((timeToNextRefill%ticksStepInterval) > 0) maxPossibleTokens += stepTokens;
            if (maxPossibleTokens < tokens) tokens = maxPossibleTokens;
        }
    }
}
View Code

   StepUpLeakyTokenBucket 空桶:即每次只將stepTokens個數的令牌放到桶中   

 1 using System;
 2 
 3 namespace CZ.FlowControl.Service
 4 {
 5     /// <summary>
 6     /// 漏桶(空桶)
 7     /// </summary>
 8     /// <remarks>
 9     ///  StepUpLeakyTokenBucketStrategy resemembles an empty bucket at the beginning but get filled will tokens over a fixed interval.
10     /// </remarks>
11     class StepUpLeakyTokenBucket : LeakyTokenBucket
12     {
13         private long lastActivityTime;
14 
15         public StepUpLeakyTokenBucket(long maxTokens, long refillInterval, int refillIntervalInMilliSeconds, long stepTokens, long stepInterval, int stepIntervalInMilliseconds) 
16             : base(maxTokens, refillInterval, refillIntervalInMilliSeconds, stepTokens, stepInterval, stepIntervalInMilliseconds)
17         {
18         }
19 
20         protected override void UpdateTokens()
21         {
22             var currentTime = SystemTime.UtcNow.Ticks;
23 
24             if (currentTime >= nextRefillTime)
25             {
26                 tokens = stepTokens;
27 
28                 lastActivityTime = currentTime;
29                 nextRefillTime = currentTime + ticksRefillInterval;
30 
31                 return;
32             }
33 
34             //calculate tokens at current step
35 
36             long elapsedTimeSinceLastActivity = currentTime - lastActivityTime;
37             long elapsedStepsSinceLastActivity = elapsedTimeSinceLastActivity / ticksStepInterval;
38 
39             tokens += (elapsedStepsSinceLastActivity*stepTokens);
40 
41             if (tokens > bucketTokenCapacity) tokens = bucketTokenCapacity;
42             lastActivityTime = currentTime;
43         }
44     }
45 }
View Code

 3、流控服務封裝

  第二章節,詳細介紹了令牌桶和漏桶的具體實現。基於以上,要重點介紹接口:IThrottleStrategy:流控的具體方式

using System;

namespace CZ.FlowControl.Spi
{
    /// <summary>
    /// 流量控制算法策略
    /// </summary>
    public interface IThrottleStrategy
    {
        /// <summary>
        /// 是否流控
        /// </summary>
        /// <param name="n"></param>
        /// <returns></returns>
        bool ShouldThrottle(long n = 1);

        /// <summary>
        /// 是否流控
        /// </summary>
        /// <param name="n"></param>
        /// <param name="waitTime"></param>
        /// <returns></returns>
        bool ShouldThrottle(long n, out TimeSpan waitTime);

        /// <summary>
        /// 是否流控
        /// </summary>
        /// <param name="waitTime"></param>
        /// <returns></returns>
        bool ShouldThrottle(out TimeSpan waitTime);

        /// <summary>
        /// 當前令牌個數
        /// </summary>
        long CurrentTokenCount { get; }
    }
}

    有了這個流控方式接口後,咱們還須要一個流控策略定義類:FlowControlStrategy

    即定義具體的流控策略:如下是這個類的詳細屬性和成員:  不只定義了流控策略類型,還定義了流控的維度信息和流控閾值,這樣流控就作成依賴注入的方式了! 

using System;
using System.Collections.Generic;
using System.Text;

namespace CZ.FlowControl.Spi
{
    /// <summary>
    /// 流控策略
    /// </summary>
    public class FlowControlStrategy
    {
        /// <summary>
        /// 標識
        /// </summary>
        public string ID { get; set; }

        /// <summary>
        /// 名稱
        /// </summary>
        public string Name { get; set; }

        /// <summary>
        /// 流控策略類型
        /// </summary>
        public FlowControlStrategyType StrategyType { get; set; }

        /// <summary>
        /// 流控閾值-Int
        /// </summary>
        public long IntThreshold { get; set; }

        /// <summary>
        /// 流控閾值-Double
        /// </summary>
        public decimal DoubleThreshold { get; set; }

        /// <summary>
        /// 時間區間跨度
        /// </summary>
        public FlowControlTimespan TimeSpan { get; set; }

        private Dictionary<string, string> flowControlConfigs;

        /// <summary>
        /// 流控維度信息
        /// </summary>
        public Dictionary<string, string> FlowControlConfigs
        {
            get
            {
                if (flowControlConfigs == null)
                    flowControlConfigs = new Dictionary<string, string>();

                return flowControlConfigs;
            }
            set
            {
                flowControlConfigs = value;
            }
        }

        /// <summary>
        /// 描述
        /// </summary>
        public string Descriptions { get; set; }

        /// <summary>
        /// 觸發流控後是否直接拒絕請求
        /// </summary>        
        public bool IsRefusedRequest { get; set; }

        /// <summary>
        /// 建立時間
        /// </summary>
        public DateTime CreateTime { get; set; }

        /// <summary>
        /// 建立人
        /// </summary>
        public string Creator { get; set; }

        /// <summary>
        /// 最後修改時間
        /// </summary>
        public DateTime LastModifyTime { get; set; }

        /// <summary>
        /// 最後修改人
        /// </summary>
        public string LastModifier { get; set; }
    }
}

   同時,流控策略類型,咱們抽象了一個枚舉:FlowControlStrategyType

   支持3種流控策略:TPS、Sum(指定時間段內請求的次數),Delay延遲

using System;
using System.Collections.Generic;
using System.Text;

namespace CZ.FlowControl.Spi
{
    /// <summary>
    /// 流控策略類型枚舉
    /// </summary>
    public enum FlowControlStrategyType
    {
        /// <summary>
        /// TPS控制策略
        /// </summary>
        TPS,
     /// <summary>
        /// 總數控制策略
        /// </summary>
        Sum,

        /// <summary>
        /// 延遲控制策略
        /// </summary>
        Delay
    }
}

  面向每種流控策略類型,提供了一個對應的流控器,好比說TPS的流控器

TPSFlowController,內部使用了固定令牌桶算法
using System;

namespace CZ.FlowControl.Service
{
    using CZ.FlowControl.Spi;

    /// <summary>
    /// TPS流量控制器
    /// </summary>
    class TPSFlowController : IFlowController
    {
        public IThrottleStrategy InnerThrottleStrategy
        {
            get; private set;
        }

        public FlowControlStrategy FlowControlStrategy { get; private set; }

        public bool ShouldThrottle(long n, out TimeSpan waitTime)
        {
            return InnerThrottleStrategy.ShouldThrottle(n, out waitTime);
        }

        public TPSFlowController(FlowControlStrategy strategy)
        {
            FlowControlStrategy = strategy;

            InnerThrottleStrategy = new FixedTokenBucket(strategy.IntThreshold, 1, 1000);
        }
    }
}

  Sum(指定時間段內請求的次數)流控器:

  

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;

namespace CZ.FlowControl.Service
{
    using CZ.FlowControl.Spi;

    /// <summary>
    /// 一段時間內合計值流量控制器
    /// </summary>
    class SumFlowController : IFlowController
    {
        public IThrottleStrategy InnerThrottleStrategy
        {
            get; private set;
        }

        public FlowControlStrategy FlowControlStrategy { get; private set; }

        public bool ShouldThrottle(long n, out TimeSpan waitTime)
        {
            return InnerThrottleStrategy.ShouldThrottle(n, out waitTime);
        }

        public SumFlowController(FlowControlStrategy strategy)
        {
            FlowControlStrategy = strategy;

            var refillInterval = GetTokenBucketRefillInterval(strategy);

            InnerThrottleStrategy = new FixedTokenBucket(strategy.IntThreshold, refillInterval, 1000);
        }

        private long GetTokenBucketRefillInterval(FlowControlStrategy strategy)
        {
            long refillInterval = 0;

            switch (strategy.TimeSpan)
            {
                case FlowControlTimespan.Second:
                    refillInterval = 1;
                    break;
                case FlowControlTimespan.Minute:
                    refillInterval = 60;
                    break;
                case FlowControlTimespan.Hour:
                    refillInterval = 60 * 60;
                    break;
                case FlowControlTimespan.Day:
                    refillInterval = 24 * 60 * 60;
                    break;
            }

            return refillInterval;
        }
    }
}

  同時,經過一個建立者工廠,根據不一樣的流控策略,建立對應的流控器(作了一層緩存,性能更好):

using System;
using System.Collections.Generic;
using System.Text;

namespace CZ.FlowControl.Service
{
    using CZ.FlowControl.Spi;

    /// <summary>
    /// 流控策略工廠
    /// </summary>
    class FlowControllerFactory
    {
        private static Dictionary<string, IFlowController> fcControllers;
        private static object syncObj = new object();

        private static FlowControllerFactory instance;

        private FlowControllerFactory()
        {
            fcControllers = new Dictionary<string, IFlowController>();
        }

        public static FlowControllerFactory GetInstance()
        {
            if (instance == null)
            {
                lock (syncObj)
                {
                    if (instance == null)
                    {
                        instance = new FlowControllerFactory();
                    }
                }
            }

            return instance;
        }

        public IFlowController GetOrCreateFlowController(FlowControlStrategy strategy)
        {
            if (strategy == null)
                throw new ArgumentNullException("FlowControllerFactory.GetOrCreateFlowController.strategy");

            if (!fcControllers.ContainsKey(strategy.ID))
            {
                lock (syncObj)
                {
                    if (!fcControllers.ContainsKey(strategy.ID))
                    {
                        var fcController = CreateFlowController(strategy);
                        if (fcController != null)
                            fcControllers.Add(strategy.ID, fcController);
                    }
                }
            }

            if (fcControllers.ContainsKey(strategy.ID))
            {
                var controller = fcControllers[strategy.ID];
                return controller;
            }

            return null;
        }

        private IFlowController CreateFlowController(FlowControlStrategy strategy)
        {
            if (strategy == null)
                throw new ArgumentNullException("FlowControllerFactory.CreateFlowController.strategy");

            IFlowController controller = null;

            switch (strategy.StrategyType)
            {
                case FlowControlStrategyType.TPS:
                    controller = new TPSFlowController(strategy);
                    break;
                case FlowControlStrategyType.Delay:
                    controller = new DelayFlowController(strategy);
                    break;
                case FlowControlStrategyType.Sum:
                    controller = new SumFlowController(strategy);
                    break;
                default:
                    break;
            }

            return controller;
        }
    }
}

 

   有了流控策略定義、咱們更進一步,繼續封裝了流控Facade服務,這樣把流控的變化封裝到內部。對外只提供流控服務接口,流控時動態傳入流控策略和流控個數:FlowControlService

   

using System;
using System.Collections.Generic;
using System.Text;

namespace CZ.FlowControl.Service
{
    using CZ.FlowControl.Spi;
    using System.Threading;

    /// <summary>
    /// 統一流控服務
    /// </summary>
    public class FlowControlService
    {
        /// <summary>
        /// 流控
        /// </summary>
        /// <param name="strategy">流控策略</param>
        /// <param name="count">請求次數</param>
        public static void FlowControl(FlowControlStrategy strategy, int count = 1)
        {
            var controller = FlowControllerFactory.GetInstance().GetOrCreateFlowController(strategy);

            TimeSpan waitTimespan = TimeSpan.Zero;

            var result = controller.ShouldThrottle(count, out waitTimespan);
            if (result)
            {
                if (strategy.IsRefusedRequest == false && waitTimespan != TimeSpan.Zero)
                {
                    WaitForAvailable(strategy, controller, waitTimespan, count);
                }
                else if (strategy.IsRefusedRequest)
                {
                    throw new Exception("觸發流控!");
                }
            }
        }

        /// <summary>
        /// 等待可用
        /// </summary>
        /// <param name="strategy">流控策略</param>
        /// <param name="controller">流控器</param>
        /// <param name="waitTimespan">等待時間</param>
        /// <param name="count">請求次數</param>
        private static void WaitForAvailable(FlowControlStrategy strategy, IFlowController controller, TimeSpan waitTimespan, int count)
        {
            var timespan = waitTimespan;
            if (strategy.StrategyType == FlowControlStrategyType.Delay)
            {
                Thread.Sleep(timespan);
                return;
            }

            while (controller.ShouldThrottle(count, out timespan))
            {
                Thread.Sleep(timespan);
            }
        }
    }
}

  以上,統一流控服務完成了第一個版本的封裝。接下來咱們看示例代碼

 4、示例代碼

    先安裝Nuget:

 
Install-Package CZ.FlowControl.Service -Version 1.0.0
 

 

    

   

    是否是很簡單。

    你們若是但願瞭解詳細的代碼,請參考這個項目的GitHub地址:

    https://github.com/zhouguoqing/FlowControl

    同時也歡迎你們一塊兒改進完善。

    

 

周國慶

2019/8/9

相關文章
相關標籤/搜索