ASP.Net Core 3.X併發限制簡介

在Asp.net core 3.0 以前使用asp.net core 會出現線程池內線程不足的狀況,出現併發衝突現象,web

併發衝突主要狀況以下:數據庫

一、用戶導航到實體編輯頁面;服務器

二、第一個用戶的更改還未寫入數據庫以前,另外一個用戶更新實體多線程

此時若是未啓用併發檢測,當發生更新時最後一個更新優先,即最後一個更新的值保存到數據庫,而第一個保存的數據將丟失;併發

那麼如何處理併發衝突呢?app

1】能夠跟蹤用戶已經修改的屬性,並只更新數據庫中相應的列,這樣,當兩個用戶更新了不一樣的屬性,下次查看時都將生效。asp.net

可是也存在如下問題:異步

【1】當對同一個屬性進行競爭性更改的話,沒法避免數據的流失async

【2】一般不適用於web應用。它須要維持重要的狀態,以便跟蹤全部提取值和新值。維持大量狀態可能影響應用性能;性能

【3】可能會增長應用複雜性(與實體上的開發檢測相比)

 

2】客戶端優先

即客戶端的值優先於數據庫存儲的值。而且若是不對併發處理進行任何編碼,將自動進行客戶端優先;

3】 存儲優先

這種方式能夠阻止在數據可對數據的更改,而且能夠

 【1】顯示錯誤消息

 【2】顯示數據的當前狀態

 【3】容許用戶從新更新應用的更改

 

如何處理併發呢?

1】檢測屬性的併發衝突,能夠使用ConcurrencyCheck特性在屬性級別檢測併發衝突。該特性可應用於模型上的多個屬性

2】檢測行的併發衝突,檢測行的併發衝突,將rowversion耿總列添加到模型

3】當屬性爲併發令牌時:

   1)EFCore 驗證提取屬性後是否未更改屬性,調用SaveChanges或者SaveChangesAsync時會執行併發檢測

  2)若是提取屬性後更改了屬性,將引起DbUpdateConcurrencyException

   數據庫和數據模型必須配置爲支持引起DbUpdateConcurrencyException.

 

 

4]在Asp.net core 3.0以後  

 微軟在asp.net core 3.0 增長了Microsoft.AspNetCore.ConcurrencyLimiter,用於傳入的請求進行排隊處理

避免線程池的不足;

Queue策略

一、添加Nuget    Install-Package Microsoft.AspNetCore.ConcurrencyLimiter

public void ConfigureServices(IServiceCollection services)
        {
            services.AddQueuePolicy(options =>
            {
                //最大併發請求數
                options.MaxConcurrentRequests = 2;
                //請求隊列長度限制
                options.RequestQueueLimit = 1;
            });
            services.AddControllers();
        }
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            //添加併發限制中間件
            app.UseConcurrencyLimiter();
            app.Run(async context =>
            {
                Task.Delay(100).Wait(); // 100ms sync-over-async

                await context.Response.WriteAsync("Hello World!");
            });
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseHttpsRedirection();

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }

經過上面簡單的配置,咱們就能夠將他引入到咱們的代碼中,從而作併發量限制,以及隊列的長度;那麼問題來了,他是怎麼實現的呢?

public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
{
        services.Configure(configure);
        services.AddSingleton<IQueuePolicy, QueuePolicy>();
        return services;
}

QueuePolicy  採用的是SemaphoreSlim 信號量設計,SemapHoreSlim、SemapHore(信號量)支持併發多線程進入被保護代碼,對象在初始化時會指定最大任務數量,當線程請求訪問資源,信號量遞減而當他們釋放時信號量數量遞增。

public QueuePolicy(IOptions<QueuePolicyOptions> options)
        {
            _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
            if (_maxConcurrentRequests <= 0)
            {
                throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer.");
            }

            _requestQueueLimit = options.Value.RequestQueueLimit;
            if (_requestQueueLimit < 0)
            {
                throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");
            }
            //使用SemaphoreSlim來限制任務最大個數
            _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
        }

 

ConcurrencyLimiterMiddleware中間件

public async Task Invoke(HttpContext context)
        {
            var waitInQueueTask = _queuePolicy.TryEnterAsync();

            // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
            bool result;

            if (waitInQueueTask.IsCompleted)
            {
                ConcurrencyLimiterEventSource.Log.QueueSkipped();
                result = waitInQueueTask.Result;
            }
            else
            {
                using (ConcurrencyLimiterEventSource.Log.QueueTimer())
                {
                    result = await waitInQueueTask;
                }
            }

            if (result)
            {
                try
                {
                    await _next(context);
                }
                finally
                {
                    _queuePolicy.OnExit();
                }
            }
            else
            {
                ConcurrencyLimiterEventSource.Log.RequestRejected();
                ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
                context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
                await _onRejected(context);
            }
        }

每次當咱們請求的時候首先會調用_queuePolicy.TryEnterAsync(),進入該方法後先開啓一個私有lock鎖,再接着判斷總請求量是否≥(請求隊列限制的大小+最大併發請求數),若是當前數量超出了,那麼我直接拋出,送你個503狀態;

if (result)
  {
         try
         {
             await _next(context);
         }
         finally
        {
            _queuePolicy.OnExit();
        }
        }
        else
        {
            ConcurrencyLimiterEventSource.Log.RequestRejected();
            ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
            context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
            await _onRejected(context);
        }

問題來了,我這邊若是說還沒到你設置的大小呢,我這個請求沒有給你服務器造不成壓力,那麼你給我處理一下吧.
await _serverSemaphore.WaitAsync();異步等待進入信號量,若是沒有線程被授予對信號量的訪問權限,則進入執行保護代碼;不然此線程將在此處等待,直到信號量被釋放爲止

 

lock (_totalRequestsLock)
    {
        if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
        {
             return false;
        }
            TotalRequests++;
        }
        //異步等待進入信號量,若是沒有線程被授予對信號量的訪問權限,則進入執行保護代碼;不然此線程將在此處等待,直到信號量被釋放爲止
        await _serverSemaphore.WaitAsync();
        return true;
    }

返回成功後那麼中間件這邊再進行處理,_queuePolicy.OnExit();經過該調用進行調用_serverSemaphore.Release();釋放信號燈,再對總請求數遞減

相關文章
相關標籤/搜索