.NET Core微服務之基於Polly+AspectCore實現熔斷與降級機制

Tip: 此篇已加入.NET Core微服務基礎系列文章索引html

1、熔斷、降級與AOP

1.1 啥是熔斷?

  在廣義的解釋中,熔斷主要是指爲控制股票、期貨或其餘金融衍生產品的交易風險,爲其單日價格波動幅度規定區間限制,一旦成交價觸及區間上下限,交易則自動中斷一段時間(「熔即斷」),或就此「躺平」而不得超過上限或下限(「熔而不斷」)。git

  而對於微服務來講,熔斷就是咱們常說的「保險絲」,意爲當服務出現某些情況時,切斷服務,從而防止應用程序不斷地常識執行可能會失敗的操做形成系統的「雪崩」,或者大量的超時等待致使系統卡死等狀況,不少地方也將其成爲「過載保護」。github

1.2 啥是降級?

  降級的目的就是當某個服務提供者發生故障的時候,向調用方返回一個替代響應或者錯誤響應數據庫

  例如:假設有一個短信服務,其調用聯通接口服務器發送短信服務(假設這裏調用聯通接口最方便,最省事也最經濟)失敗以後,會嘗試改用移動短信服務器(假設這裏調用移動服務器比較不方便也不經濟)發送,若是移動服務器調用也失敗,那麼還會嘗試改用電信短信服務器(假設這裏調用電信服務器最不省事和最不經濟),若是還失敗,則返回「失敗」響應;編程

  降級的另外一個概念也能夠看做是服務的「選擇性放棄」,好比在雙11或618等大型的電商活動日中,在高峯值的情形下,通常的電商系統都會採用部分服務的優先級下降或者乾脆延時或中止服務,以確保主要的服務可以使用最大化的資源爲客戶提供服務。等待峯值降低以後,再經過處理恢復那些降級的服務的原有優先級。json

1.3 啥是AOP?

  AOP(Aspect Oriented Programming)意爲面向切面編程,它是指在運行時,動態地將代碼切入到類的指定方法、指定位置上的編程思想就是面向切面的編程。好比說,咱們在兩個類中,可能都須要在每一個方法中作日誌。按面向對象的設計方法,咱們就必須在兩個類的方法中都加入日誌的內容。也許他們是徹底相同的,但就是由於面向對象的設計讓類與類之間沒法聯繫,而不能將這些重複的代碼統一塊兒來。而AOP就是爲了解決這個問題而生的,通常而言,咱們把切入到指定類指定方法的代碼片斷稱爲切面,而切入到哪些類、哪些方法則叫切入點。有了AOP,咱們就能夠把幾個類共有的代碼,抽取到一個切片中,等到須要時再切入對象中去,從而改變其原有的行爲。api

  AOP是OOP(Object Oriented Programming)的補充,OOP從橫向上區分出一個個的類來,而AOP則從縱向上向對象中加入特定的代碼。有了AOP,OOP變得立體了。關於AOP的更多細節和討論,能夠瀏覽知乎的這篇帖子:《什麼是AOP?緩存

2、Polly的基本使用

2.1 Polly極簡介紹

  Polly是一個被.NET基金會承認的彈性和瞬態故障處理庫,容許咱們以很是順暢和線程安全的方式來執諸如行重試,斷路,超時,故障恢復等策略,其主要功能以下:安全

  • 功能1:重試(Retry)
  • 功能2:斷路器(Circuit-Breaker)
  • 功能3:超時檢測(Timeout)
  • 功能4:緩存(Cache)
  • 功能5:降級(Fallback)

  Polly的策略主要由「故障」和「動做」兩個部分組成,「故障」能夠包括異常、超時等狀況,「動做」則包括Fallback(降級)、重試(Retry)、熔斷(Circuit-Breaker)等。策略則用來執行業務代碼,當業務代碼出現了「故障」中的狀況時就開始執行「動做」。服務器

2.2 Polly基礎使用

  *.這裏只介紹幾個咱們須要用到的功能,其餘功能請瀏覽參考資料關於Polly的部分

  (1)經過NuGet安裝,最新版本:6.0.1

NuGet>Install-Package Polly  

  (2)FallBack => 當出現故障,則進入降級動做

    public static void Case1()
    {
        ISyncPolicy policy = Policy.Handle<ArgumentException>()
            .Fallback(() =>
            {
                Console.WriteLine("Error occured");
            });

        policy.Execute(() =>
        {
            Console.WriteLine("Job Start");

            throw new ArgumentException("Hello Polly!");

            Console.WriteLine("Job End");
        });
    }

  執行結果以下圖所示:這裏捕捉的是ArgumentException, 若是想捕捉全部的Exception,請設置Policy.Handle<Exception>,不過這樣就擴大了範圍。

  

  (3)Retry => 重試,比較容易理解

    public static void Case2()
    {
        ISyncPolicy policy = Policy.Handle<Exception>().Retry(3);

        try
        {
            policy.Execute(() =>
            {
                Console.WriteLine("Job Start");
                if (DateTime.Now.Second % 10 != 0)
                {
                    throw new Exception("Special error occured");
                }
                Console.WriteLine("Job End");
            });
        }
        catch (Exception ex)
        {
            Console.WriteLine("There's one unhandled exception : " + ex.Message);
        }
    }

  執行結果以下圖所示:能夠看到,這裏重試了三次,仍然沒有知足條件(DateTime.Now.Second % 10 == 0),所以進入了外部的未處理異常catch塊中。

  

  (4)CircuitBreaker => 短路保護,當一塊業務代碼/服務 出現了N次錯誤,則把「熔斷器」(保險絲)熔斷,等待一段時間後才容許再次執行,在這段等待的時間內若是再執行則直接拋出BrokenCircuitException異常。這個也很好理解,好比咱們的手機屏幕密碼,若是輸錯了N次以後,手機會拒絕咱們再次輸入,而是讓咱們等待20 ~ 30s 以後再輸入,若是等待以後再輸錯N次,則再次進入等待。

  這裏假設咱們設置一個短路保護策略:當發生了故障的時候,則重試了5次仍是有故障(代碼中的6表明的是在執行短路保護策略以前容許6次故障),那麼久中止服務10s鍾,10s以後再容許重試。

    public static void Case3()
    {
        // Stop for 10s after retry 6 times
        ISyncPolicy policy = Policy.Handle<Exception>()
            .CircuitBreaker(6, TimeSpan.FromSeconds(10));

        while (true)
        {
            try
            {
                policy.Execute(() =>
                {
                    Console.WriteLine("Job Start");
                    throw new Exception("Special error occured");
                    Console.WriteLine("Job End");
                });
            }
            catch (Exception ex)
            {
                Console.WriteLine("There's one unhandled exception : " + ex.Message);
            }

            Thread.Sleep(500);
        }
    }

  執行結果以下圖所示:出現了6次故障以後,直接給咱們跑出了短路保護的異常,「The circuit is now open and is not allowing calls」.

  

  (5)Timeout 與 Wrap => Wrap是指策略封裝,能夠把多個ISyncPolicy合併到一塊兒執行。Timeout則是指超時處理,可是超時策略通常不能直接使用,而是其其餘策略封裝到一塊兒使用。

  這裏咱們封裝兩個策略,一個是基本的Fallback,另外一個則是超時策略,若是調用執行時間超過2s則觸發Fallback。

  這裏涉及到Polly中關於超時的兩個策略:一個是悲觀策略(Pessimistic),一個是樂觀策略(Optimistic)。其中,悲觀策略超時後會直接拋異常,而樂觀策略則不會,而只是觸發CancellationTokenSource.Cancel函數,須要等待委託自行終止操做。通常狀況下,咱們都會用悲觀策略。

    public static void Case4()
    {
        try
        {
            ISyncPolicy policyException = Policy.Handle<TimeoutRejectedException>()
                .Fallback(() =>
                {
                    Console.WriteLine("Fallback");
                });
            ISyncPolicy policyTimeout = Policy.Timeout(3, Polly.Timeout.TimeoutStrategy.Pessimistic);
            ISyncPolicy mainPolicy = Policy.Wrap(policyTimeout, policyException);
            mainPolicy.Execute(() =>
            {
                Console.WriteLine("Job Start...");
                Thread.Sleep(5000);
                //throw new Exception();
                Console.WriteLine("Job End...");
            });
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Unhandled exception : {ex.GetType()} : {ex.Message}");
        }
    }

  執行結果以下圖所示:

  

  除此以外,Polly還提供了一些異步方法供調用以實現以上介紹的功能,好比在業務代碼中有一些Http的調用或者IO操做時,不妨用用異步操做來提升一點效率,能夠看下面這個例子:

    public static async void Case5()
    {
        Policy<byte[]> policy = Policy<byte[]>.Handle<Exception>()
            .FallbackAsync(async c =>
            {
                Console.WriteLine("Executed Error!");
                return new byte[0];
            }, async r =>
            {
                Console.WriteLine(r.Exception);
            });

        policy = policy.WrapAsync(Policy.TimeoutAsync(20, TimeoutStrategy.Pessimistic,
            async (context, timespan, task) =>
            {
                Console.WriteLine("Timeout!");
            }));

        var bytes = await policy.ExecuteAsync(async ()=>
        {
            Console.WriteLine("Start Job");
            HttpClient httpClient = new HttpClient();
            var result = await httpClient.GetByteArrayAsync("https://images2018.cnblogs.com/blog/381412/201806/381412-20180606230929894-145212290.png");
            Console.WriteLine("Finish Job");

            return result;
        });

        Console.WriteLine($"Length of bytes : {bytes.Length}");
    }

  執行結果以下圖所示:

  

  至於Polly更多的功能和用法,能夠參閱官方文檔,這裏再也不贅述。

3、AspectCore的基本使用

3.1 爲何要用AOP框架

  從上面的例子能夠看出,若是直接使用Polly,那麼就會形成咱們的業務代碼中混雜大量的業務無關的代碼。因此,咱們會使用AOP的方式來封裝Polly,嗯,首先咱們先找一個支持的.NET Core的AOP框架吧,目前你們都在用AspectCore(國產,做者Lemon),它採用動態動態代理/織入,而且支持異步方法的攔截。

  快快經過NuGet安裝一個吧:

NuGet>Install-Package AspectCore.Core  

3.2 AspectCore的極簡使用

  這裏假設咱們要針對一個類的某些類的某些方法進行攔截,咱們通常會通過一下幾個步驟:

  (1)編寫一個攔截器,通常繼承自AbstractInterceptorAttribute

    /// <summary>
    /// 自定義攔截器
    /// </summary>
    public class CustomInterceptorAttribute : AbstractInterceptorAttribute
    {
        /// <summary>
        /// 每一個被攔截的方法中執行
        /// </summary>
        /// <param name="context"></param>
        /// <param name="next"></param>
        /// <returns></returns>
        public override async Task Invoke(AspectContext context, AspectDelegate next)
        {
            try
            {
                Console.WriteLine("Before service call");
                await next(context); // 執行被攔截的方法
            }
            catch (Exception)
            {
                Console.WriteLine("Service threw an exception");
                throw;
            }
            finally
            {
                Console.WriteLine("After service call");
            }
        }
    }

  這裏咱們經過爲被攔截方法增長一些處理前和處理後的logic來實現AOP。

  (2)編寫須要被代理攔截的類

    /// <summary>
    /// 實現AoP的兩個要求:
    /// 1.public 類
    /// 2.virtual 方法
    /// </summary>
    public class Person
    {
        [CustomInterceptor]
        public virtual void Say(string message)
        {
            Console.WriteLine($"Service calling ... => {message}");
        }
    }

  能夠看到咱們在要攔截的方法Say()的聲明之上加了一個Attribute:CustomInterceptor,正是咱們以前新增的。

  (3)經過AspectCore建立代理對象實現AOP

    public class Program
    {
        public static void Main(string[] args)
        {
            ProxyGeneratorBuilder proxyGeneratorBuilder = new ProxyGeneratorBuilder();
            using (IProxyGenerator proxyGenerator = proxyGeneratorBuilder.Build())
            {
                Person p = proxyGenerator.CreateClassProxy<Person>();
                p.Say("edisonchou.cnblogs.com");
            }
            Console.ReadKey();
        }
    }

  執行結果以下圖所示:

  

  代碼很清晰,再也不解釋。直到這裏,咱們看到了不論是Polly的使用,仍是AspectCore的使用,都存在一些業務無關的聲明代碼,並且咱們須要結合Polly和AspectCore才能完整地實現適合ASP.NET Core的熔斷降級組件,下面咱們就來模仿Spring Cloud中的Hystrix(能夠參考這一篇文章來了解Spring Cloud Hystrix是個啥玩意兒)

4、Polly+AspectCore的結合使用

4.1 封裝一個Hystrix

NuGet>Install-Package Polly

NuGet>Install-Package AspectCore.Core

NuGet>Install-Package Microsoft.Extensions.Caching.Memory  

    [AttributeUsage(AttributeTargets.Method)]
    public class HystrixCommandAttribute : AbstractInterceptorAttribute
    {
        /// <summary>
        /// 最多重試幾回,若是爲0則不重試
        /// </summary>
        public int MaxRetryTimes { get; set; } = 0;

        /// <summary>
        /// 重試間隔的毫秒數
        /// </summary>
        public int RetryIntervalMilliseconds { get; set; } = 100;

        /// <summary>
        /// 是否啓用熔斷
        /// </summary>
        public bool IsEnableCircuitBreaker { get; set; } = false;

        /// <summary>
        /// 熔斷前出現容許錯誤幾回
        /// </summary>
        public int ExceptionsAllowedBeforeBreaking { get; set; } = 3;

        /// <summary>
        /// 熔斷多長時間(毫秒)
        /// </summary>
        public int MillisecondsOfBreak { get; set; } = 1000;

        /// <summary>
        /// 執行超過多少毫秒則認爲超時(0表示不檢測超時)
        /// </summary>
        public int TimeOutMilliseconds { get; set; } = 0;

        /// <summary>
        /// 緩存多少毫秒(0表示不緩存),用「類名+方法名+全部參數ToString拼接」作緩存Key
        /// </summary>

        public int CacheTTLMilliseconds { get; set; } = 0;

        private static ConcurrentDictionary<MethodInfo, Policy> policies 
            = new ConcurrentDictionary<MethodInfo, Policy>();

        private static readonly IMemoryCache memoryCache 
            = new MemoryCache(new Microsoft.Extensions.Caching.Memory.MemoryCacheOptions());

        /// <summary>
        /// HystrixCommandAttribute
        /// </summary>
        /// <param name="fallBackMethod">降級的方法名</param>
        public HystrixCommandAttribute(string fallBackMethod)
        {
            this.FallBackMethod = fallBackMethod;
        }

        public string FallBackMethod { get; set; }

        public override async Task Invoke(AspectContext context, AspectDelegate next)
        {
            //一個HystrixCommand中保持一個policy對象便可
            //其實主要是CircuitBreaker要求對於同一段代碼要共享一個policy對象
            //根據反射原理,同一個方法的MethodInfo是同一個對象,可是對象上取出來的HystrixCommandAttribute
            //每次獲取的都是不一樣的對象,所以以MethodInfo爲Key保存到policies中,確保一個方法對應一個policy實例
            policies.TryGetValue(context.ServiceMethod, out Policy policy);
            lock (policies)//由於Invoke多是併發調用,所以要確保policies賦值的線程安全
            {
                if (policy == null)
                {
                    policy = Policy.NoOpAsync();//建立一個空的Policy
                    if (IsEnableCircuitBreaker)
                    {
                        policy = policy.WrapAsync(Policy.Handle<Exception>().CircuitBreakerAsync(ExceptionsAllowedBeforeBreaking, TimeSpan.FromMilliseconds(MillisecondsOfBreak)));
                    }
                    if (TimeOutMilliseconds > 0)
                    {
                        policy = policy.WrapAsync(Policy.TimeoutAsync(() => TimeSpan.FromMilliseconds(TimeOutMilliseconds), Polly.Timeout.TimeoutStrategy.Pessimistic));
                    }
                    if (MaxRetryTimes > 0)
                    {
                        policy = policy.WrapAsync(Policy.Handle<Exception>().WaitAndRetryAsync(MaxRetryTimes, i => TimeSpan.FromMilliseconds(RetryIntervalMilliseconds)));
                    }
                    Policy policyFallBack = Policy
                    .Handle<Exception>()
                    .FallbackAsync(async (ctx, t) =>
                    {
                        AspectContext aspectContext = (AspectContext)ctx["aspectContext"];
                        var fallBackMethod = context.ServiceMethod.DeclaringType.GetMethod(this.FallBackMethod);
                        Object fallBackResult = fallBackMethod.Invoke(context.Implementation, context.Parameters);
                        //不能以下這樣,由於這是閉包相關,若是這樣寫第二次調用Invoke的時候context指向的
                        //仍是第一次的對象,因此要經過Polly的上下文來傳遞AspectContext
                        //context.ReturnValue = fallBackResult;
                        aspectContext.ReturnValue = fallBackResult;
                    }, async (ex, t) => { });

                    policy = policyFallBack.WrapAsync(policy);
                    //放入
                    policies.TryAdd(context.ServiceMethod, policy);
                }
            }

            //把本地調用的AspectContext傳遞給Polly,主要給FallbackAsync中使用,避免閉包的坑
            Context pollyCtx = new Context();
            pollyCtx["aspectContext"] = context;

            //Install-Package Microsoft.Extensions.Caching.Memory
            if (CacheTTLMilliseconds > 0)
            {
                //用類名+方法名+參數的下劃線鏈接起來做爲緩存key
                string cacheKey = "HystrixMethodCacheManager_Key_" + context.ServiceMethod.DeclaringType
                                                                   + "." + context.ServiceMethod + string.Join("_", context.Parameters);
                //嘗試去緩存中獲取。若是找到了,則直接用緩存中的值作返回值
                if (memoryCache.TryGetValue(cacheKey, out var cacheValue))
                {
                    context.ReturnValue = cacheValue;
                }
                else
                {
                    //若是緩存中沒有,則執行實際被攔截的方法
                    await policy.ExecuteAsync(ctx => next(context), pollyCtx);
                    //存入緩存中
                    using (var cacheEntry = memoryCache.CreateEntry(cacheKey))
                    {
                        cacheEntry.Value = context.ReturnValue;
                        cacheEntry.AbsoluteExpiration = DateTime.Now + TimeSpan.FromMilliseconds(CacheTTLMilliseconds);
                    }
                }
            }
            else//若是沒有啓用緩存,就直接執行業務方法
            {
                await policy.ExecuteAsync(ctx => next(context), pollyCtx);
            }
        }
    }

  這個HystrixCommand並不是我原創,而是引用的楊中科老師在.NET微服務中的代碼,你們也能夠直接經過NuGet安裝這個封裝好的Package:

NuGet>Install-Package RuPeng.HystrixCore

  這裏再也不多講解代碼,由於都有註釋,你們經過一個案例調試如下就瞭解流程了。

4.2 在ASP.NET Core的使用

  (1)爲了簡化代理類對象的注入,不用在ASP.NET Core中再經過ProxyGeneratorBuilder進行注入,咱們引入一個AspectCore的DI擴展包:

NuGet>Install-Package AspectCore.Extensions.DependencyInjection

  (2)改寫Startup類的ConfigureService方法,把返回值從void改成IServiceProvider

    // This method gets called by the runtime. Use this method to add services to the container.
    public IServiceProvider ConfigureServices(IServiceCollection services)
    {
        services.AddMvc();
        .......
        // AoP - AspectCore
        RegisterServices(this.GetType().Assembly, services);
        return services.BuildAspectCoreServiceProvider();
    }

  這裏BuildAspectCoreServiceProvider就是讓AspectCore接管注入。RegisterService方法以下所示:

    private static void RegisterServices(Assembly asm, IServiceCollection services)
    {
        foreach (var type in asm.GetExportedTypes())
        {
            bool hasHystrixCommand = type.GetMethods().Any(m =>
                m.GetCustomAttribute(typeof(HystrixCommandAttribute)) != null);
            if (hasHystrixCommand)
            {
                services.AddSingleton(type);
            }
        }
    }

  這裏使用反射,篩選出那些帶有HystrixCommandAttribute的類進行注入,從而減小一行一行注入的代碼工做量。

  (3)這裏假設咱們須要進行熔斷保護的方法所在類是一個ProductService類,它主要的功能就是經過HttpClient去調用ProductService的某個API,它的定義以下:

    public class ProductService
    {
        [HystrixCommand(nameof(GetAllProductsFallBackAsync),
            IsEnableCircuitBreaker = true,
            ExceptionsAllowedBeforeBreaking = 3,
            MillisecondsOfBreak = 1000 * 5)]
        public virtual async Task<string> GetAllProductsAsync(string productType)
        {
            Console.WriteLine($"-->>Starting get product type : {productType}");
            string str = null;
            str.ToString();
            
            // to do : using HttpClient to call outer service to get product list

            return $"OK {productType}";
        }

        public virtual async Task<string> GetAllProductsFallBackAsync(string productType)
        {
            Console.WriteLine($"-->>FallBack : Starting get product type : {productType}");

            return $"OK for FallBack  {productType}";
        }
    }

  這裏假設咱們主要針對GetAllProductsAsync這個方法進行熔斷保護,假設它會調用另外一個Service的獲取產品的接口,這個接口會訪問核心數據庫,其天天的訪問量很大,咱們對此接口進行熔斷保護,設置在啓用熔斷保護前容許兩次故障(這裏主要指異常),熔斷保護時間爲5s。

  在Controller中,經過構造函數注入:

    [Produces("application/json")]
    [Route("api/Client")]
    public class ClientController : Controller
    {
        private readonly IClientService clientService;
        private readonly ProductService productService;

        public ClientController(IClientService _clientService, ProductService _productService)
        {
            clientService = _clientService;
            productService = _productService;
        }

        [HttpGet("{id}")]
        public async Task<string> Get(int id)
        {
            var product = await productService.GetAllProductsAsync("B");

            return product;
        }
    }

  爲了可以在控制檯中看到熔斷的信息,咱們增長一句Console.WriteLine到HystrixCommandAttribute中:

    // 啓用熔斷保護(CircuitBreaker)
    if (IsEnableCircuitBreaker)
    {
        policy = policy.WrapAsync(Policy.Handle<Exception>()
            .CircuitBreakerAsync(ExceptionsAllowedBeforeBreaking,
            TimeSpan.FromMilliseconds(MillisecondsOfBreak), (ex, ts) =>
            {
                    // assuem to do logging
                    Console.WriteLine($"Service API OnBreak -- ts = {ts.Seconds}s, ex.message = {ex.Message}");
            }, () => {}));
    }

  這樣當Polly啓用熔斷時便會在控制檯中輸出一段消息,實際使用中能夠往日誌中寫一段日誌信息。

  (4)開起內置服務器進行測試

  Step1.藉助命令行啓動一個WebAPI程序

  Step2.藉助Postman/SoapUI等API測試工具,輸入咱們的URL,測試結果以下圖所示:

  

  能夠看到咱們經過在Postman中訪問這個URL從而觸發Service中的異常,兩次異常以後,便進入了熔斷保護時間,此後5s內的訪問都沒有再進行實際代碼的執行,而直接進入了Fallback方法執行降級後的邏輯。5s保護時間以後,則再次進入實際代碼的執行。目前,這個Hystrix還存在一些問題,需繼續完善,還沒法正式投入使用,後續會結合Polly和Ocelot,在API網關處作統一熔斷保護。

5、小結

  本篇首先介紹了一下熔斷、降級以及AOP的基本概念,而後從兩個流行的庫Polly和AspectCore的基本使用開始瞭解如何在.NET Core代碼中實現熔斷機制和AOP,最後經過結合Polly+AspectCore封裝了一個Hystrix來介紹了一下如何在ASP.NET Core程序中如何作到標籤式地快速實現熔斷降級機制。後續,會將Polly與Ocelot結合實踐API網關,在Ocelot網關處作統一熔斷保護。

參考資料

楊中科,《.NET微服務直播課課件(第二版)》

guwei,《談談我對服務熔斷、服務降級的理解

Jeffcky,《已被.NET基金會承認的彈性和瞬態故障處理庫Polly介紹

Lemon,《Asp.Net Core輕量級Aop解決方案:AspectCore

Sunday_Xiao,《服務熔斷保護Spring Cloud Hystrix

Catcher Wong, 《再談Circuit Breaker之使用Polly

Polly官方文檔,https://github.com/App-vNext/Polly

AspectCore官方文檔,https://github.com/dotnetcore/AspectCore-Framework

 

相關文章
相關標籤/搜索