前言
Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0後增長的,用於傳入的請求進行排隊處理,避免線程池的不足.
咱們平常開發中可能常作的給某web服務器配置鏈接數以及,請求隊列大小,那麼今天咱們看看如何在經過中間件形式實現一個併發量以及隊列長度限制.web
Queue策略
Install-Package Microsoft.AspNetCore.ConcurrencyLimiter
服務器
public void ConfigureServices(IServiceCollection services) { services.AddQueuePolicy(options => { //最大併發請求數 options.MaxConcurrentRequests = 2; //請求隊列長度限制 options.RequestQueueLimit = 1; }); services.AddControllers(); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { //添加併發限制中間件 app.UseConcurrencyLimiter(); app.Run(async context => { Task.Delay(100).Wait(); // 100ms sync-over-async await context.Response.WriteAsync("Hello World!"); }); if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseHttpsRedirection(); app.UseRouting(); app.UseAuthorization(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); }
經過上面簡單的配置,咱們就能夠將他引入到咱們的代碼中,從而作併發量限制,以及隊列的長度;那麼問題來了,他是怎麼實現的呢?多線程
public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure) { services.Configure(configure); services.AddSingleton<IQueuePolicy, QueuePolicy>(); return services; }
QueuePolicy採用的是SemaphoreSlim信號量設計,SemaphoreSlim、Semaphore(信號量)支持併發多線程進入被保護代碼,對象在初始化時會指定 最大任務數量,當線程請求訪問資源,信號量遞減,而當他們釋放時,信號量計數又遞增。併發
/// <summary> /// 構造方法(初始化Queue策略) /// </summary> /// <param name="options"></param> public QueuePolicy(IOptions<QueuePolicyOptions> options) { _maxConcurrentRequests = options.Value.MaxConcurrentRequests; if (_maxConcurrentRequests <= 0) { throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer."); } _requestQueueLimit = options.Value.RequestQueueLimit; if (_requestQueueLimit < 0) { throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number."); } //使用SemaphoreSlim來限制任務最大個數 _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests); }
ConcurrencyLimiterMiddleware中間件app
/// <summary> /// Invokes the logic of the middleware. /// </summary> /// <param name="context">The <see cref="HttpContext"/>.</param> /// <returns>A <see cref="Task"/> that completes when the request leaves.</returns> public async Task Invoke(HttpContext context) { var waitInQueueTask = _queuePolicy.TryEnterAsync(); // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets. bool result; if (waitInQueueTask.IsCompleted) { ConcurrencyLimiterEventSource.Log.QueueSkipped(); result = waitInQueueTask.Result; } else { using (ConcurrencyLimiterEventSource.Log.QueueTimer()) { result = await waitInQueueTask; } } if (result) { try { await _next(context); } finally { _queuePolicy.OnExit(); } } else { ConcurrencyLimiterEventSource.Log.RequestRejected(); ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger); context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; await _onRejected(context); } }
每次當咱們請求的時候首先會調用_queuePolicy.TryEnterAsync()
,進入該方法後先開啓一個私有lock鎖,再接着判斷總請求量是否≥(請求隊列限制的大小+最大併發請求數),若是當前數量超出了,那麼我直接拋出,送你個503狀態;異步
if (result) { try { await _next(context); } finally { _queuePolicy.OnExit(); } } else { ConcurrencyLimiterEventSource.Log.RequestRejected(); ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger); context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; await _onRejected(context); }
問題來了,我這邊若是說還沒到你設置的大小呢,我這個請求沒有給你服務器造不成壓力,那麼你給我處理一下吧.
await _serverSemaphore.WaitAsync();
異步等待進入信號量,若是沒有線程被授予對信號量的訪問權限,則進入執行保護代碼;不然此線程將在此處等待,直到信號量被釋放爲止async
lock (_totalRequestsLock) { if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests) { return false; } TotalRequests++; } //異步等待進入信號量,若是沒有線程被授予對信號量的訪問權限,則進入執行保護代碼;不然此線程將在此處等待,直到信號量被釋放爲止 await _serverSemaphore.WaitAsync(); return true; }
返回成功後那麼中間件這邊再進行處理,_queuePolicy.OnExit();
經過該調用進行調用_serverSemaphore.Release();
釋放信號燈,再對總請求數遞減ui
Stack策略
再來看看另外一種方法,棧策略,他是怎麼作的呢?一塊兒來看看.再附加上如何使用的代碼.this
public void ConfigureServices(IServiceCollection services) { services.AddStackPolicy(options => { //最大併發請求數 options.MaxConcurrentRequests = 2; //請求隊列長度限制 options.RequestQueueLimit = 1; }); services.AddControllers(); }
經過上面的配置,咱們即可以對咱們的應用程序執行出相應的策略.下面再來看看他是怎麼實現的呢線程
public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure) { services.Configure(configure); services.AddSingleton<IQueuePolicy, StackPolicy>(); return services; }
能夠看到此次是經過StackPolicy
類作的策略.來一塊兒來看看主要的方法
/// <summary> /// 構造方法(初始化參數) /// </summary> /// <param name="options"></param> public StackPolicy(IOptions<QueuePolicyOptions> options) { //棧分配 _buffer = new List<ResettableBooleanCompletionSource>(); //隊列大小 _maxQueueCapacity = options.Value.RequestQueueLimit; //最大併發請求數 _maxConcurrentRequests = options.Value.MaxConcurrentRequests; //剩餘可用空間 _freeServerSpots = options.Value.MaxConcurrentRequests; }
當咱們經過中間件請求調用,_queuePolicy.TryEnterAsync()
時,首先會判斷咱們是否還有訪問請求次數,若是_freeServerSpots>0,那麼則直接給咱們返回true,讓中間件直接去執行下一步,若是當前隊列=咱們設置的隊列大小的話,那咱們須要取消先前請求;每次取消都是先取消以前的保留後面的請求;
public ValueTask<bool> TryEnterAsync() { lock (_bufferLock) { if (_freeServerSpots > 0) { _freeServerSpots--; return _trueTask; } // 若是隊列滿了,取消先前的請求 if (_queueLength == _maxQueueCapacity) { _hasReachedCapacity = true; _buffer[_head].Complete(false); _queueLength--; } var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this); _cachedResettableTCS = null; if (_hasReachedCapacity || _queueLength < _buffer.Count) { _buffer[_head] = tcs; } else { _buffer.Add(tcs); } _queueLength++; // increment _head for next time _head++; if (_head == _maxQueueCapacity) { _head = 0; } return tcs.GetValueTask(); } }
當咱們請求後調用_queuePolicy.OnExit();
出棧,再將請求長度遞減
public void OnExit() { lock (_bufferLock) { if (_queueLength == 0) { _freeServerSpots++; if (_freeServerSpots > _maxConcurrentRequests) { _freeServerSpots--; throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync"); } return; } // step backwards and launch a new task if (_head == 0) { _head = _maxQueueCapacity - 1; } else { _head--; } //退出,設置成已完成 _buffer[_head].Complete(true); _queueLength--; } }
總結
基於棧結構的特色,在實際應用中,一般只會對棧執行如下兩種操做:
隊列存儲結構的實現有如下兩種方式: