AspNetCore結合Redis實踐消息隊列,今後放心安全迭代

引言

  熟悉TPL Dataflow博文的朋友可能記得這是個單體程序,使用TPL Dataflow 處理工做流任務, 在使用Docker部署的過程當中, 有一個問題一直沒法迴避:html

       在單體程序部署的瞬間(服務不可用)會有少許流量沒法處理;更糟糕的狀況下,迭代部署的這個版本有問題,上線後沒法運做, 更多的流量沒有獲得處理。git

      揹負神聖使命(巨大壓力)的程序猿心生一計, 爲什麼不將單體程序改爲分佈式:增長服務ReceiverApp只接收數據,服務WebApp只處理數據。github

 

知識儲備

    消息隊列和訂閱發佈做爲老生常談的兩個知識點被反覆說起,按照JMS的規範, 官方稱爲點對點(point to point,message queue) 和 訂閱發佈(publish/subscribe,channel / topic )web

點對點

  消息生產者生產消息發送到Message Queue中,而後消費者從隊列中取出消息並消費。redis

隊列會保留消息,直到他們被消費或超時;docker

MQ支持多消費者,每一個消息只能被一個消費者處理編程

消息發送者和消費者在時間上沒有依賴性,當發送者發送消息以後, 無論消費者有沒有在運行(甚至無論有沒有消費者),都不會影響到消息被髮送到隊列api

③ 通常消費者在消費以後須要向隊列應答成功安全

若是但願發送的消息都被處理,或只能被處理一次,你應該使用p2p模型。數據結構

發佈/訂閱

  消息生產者將消息發佈到Channel,同時有多個消息消費者(訂閱)該消息。和點對點方式不一樣,發佈到 特定通道的消息會被通道訂閱者實時接收。

通道 只有暫存機制,發佈的消息只能被當前訂閱者收到。

①每一個消息能夠有多個消費者

②發佈者和消費者 有時間上依賴性, 針對某topic的訂閱者,必須先建立相應訂閱,才能消費消息

將消息發佈到通道中,而不關注訂閱者是誰;訂閱者可收聽本身感興趣的多個通道(造成Topic), 也不關注發佈者是誰。

③ 故若是沒有消費者,發佈的消息將得不處處理;

若是但願廣播的消息被實時接收,應該採用發佈-訂閱模型。

 

頭腦風暴 

Redis 內置的List數據結構亦能造成輕量級MQ的效果,Redis 原生支持發佈/訂閱 模型。

如上所述, Pub/Sub 模型 在訂閱者宕機的時候,發佈的消息得不處處理,故此模型不能用於 強業務的 數據接收和處理

本次採用的消息隊列模型:

  •    解耦業務:  新建Receiver程序做爲生產者,專一於接收併發送到隊列;原有的webapp做爲消費者專一數據處理。
  •    起到削峯填谷的做用, 若創建多個消費者webapp容器,還能造成負載均衡的效果。 

    須要關注Redis 兩個命令( 左進右出,右進左出同理):

    LPUSH  &  RPOP/BRPOP

Brpop 中的B 表示 「Block」, 是一個rpop命令的阻塞版本:若指定List沒有新元素,在給定時間內,該命令會阻塞當前redis客戶端鏈接,直到超時返回nil

編程實踐

本次使用 ASPNetCore 完成RedisMQ的實踐,引入Redis國產第三方開源庫CSRedisCore.

不使用著名的StackExchange.Redis 組件庫的緣由:

  • 以前一直使用StackExchange.Redis, 參考了不少資料,作了不少優化,並未徹底解決RedisTimeoutException問題 

  • StackExchange.Redis基於其多路複用的鏈接機制,不支持阻塞式命令, 故採用了 CSRedisCore,該庫強調了API 與Redis官方命令一致,很容易上手

生產者Receiver

 生產者使用LPush 命令向Redis List數據結構寫入消息。

------------------截取自Startup.cs-------------------------

public void ConfigureServices(IServiceCollection services)
{
  // Redis客戶端要定義成單例, 否則在大流量併發收數的時候, 會形成redis client來不及釋放。另外一方面也確認api控制器不是單例模式,
  var csredis = new CSRedisClient(Configuration.GetConnectionString("redis")+",name=receiver");
  RedisHelper.Initialization(csredis);
  services.AddSingleton(csredis);

 services.AddMvc();

}

------------------截取自數據接收Controller-------------------
[Route("batch")]
[HttpPost]
public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair> eqidPairs)
{
  if (!ModelState.IsValid)
  throw new ArgumentException("Http Body Payload Error.");
  var redisKey = $"{DateTime.Now.ToString("yyyyMMdd")}"; 
   eqidPairs = await EqidExtractor.EqidExtractAsync(eqidPairs);
   if (eqidPairs != null && eqidPairs.Any())
    RedisHelper.LPush(redisKey, eqidPairs.ToArray());
    await Task.CompletedTask;
 }

 消費者webapp

     根據以上RedisMQ思路,事件消費方式是拉取pull,故須要輪詢Redis  List數據結構,這裏使用ASPNetCore內置的BackgroundService後臺服務類實現後臺輪詢消費任務。

public class BackgroundJob : BackgroundService
    {
        private readonly IEqidPairHandler _eqidPairHandler;
        private readonly CSRedisClient[] _cSRedisClients;
        private readonly IConfiguration _conf;
        private readonly ILogger _logger;
        public BackgroundJob(IEqidPairHandler eqidPairHandler, CSRedisClient[] csRedisClients,IConfiguration conf,ILoggerFactory loggerFactory)
        {
            _eqidPairHandler = eqidPairHandler;
            _cSRedisClients = csRedisClients;
            _conf = conf;
            _logger = loggerFactory.CreateLogger(nameof(BackgroundJob));
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogInformation("Service starting");
            if (_cSRedisClients[0] == null)
            {
                _cSRedisClients[0] = new CSRedisClient(_conf.GetConnectionString("redis") + ",defaultDatabase=" + 0);
            }
            RedisHelper.Initialization(_cSRedisClients[0]);

            while (!stoppingToken.IsCancellationRequested)
            {
                var key = $"eqidpair:{DateTime.Now.ToString("yyyyMMdd")}";
                var eqidpair = RedisHelper.BRPop(5, key);
                if (eqidpair != null)
                    await _eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>(eqidpair));
                // 強烈建議不管如何休眠一段時間,防止突發大流量致使webApp進程CPU滿載,自行根據場景設置合理休眠時間
                await Task.Delay(10, stoppingToken);
            }
            _logger.LogInformation("Service stopping");
        }
    }

最後依照引言中的部署原理圖,將Nginx,Receiver, WebApp使用docker-compose工具容器化

根據docker-compsoe up命令的用法,若容器正在運行且對應的Service Configuration或Image並未改變,該容器不會被ReCreate;

docker-compose  up指令只會重建(Service或Image變動)的容器。

If there are existing containers for a service, and the service’s configuration or image was changed after the container’s creation, docker-compose up picks up the changes by stopping and recreating the containers (preserving mounted volumes). To prevent Compose from picking up changes, use the --no-recreate flag.

作一次上線測試驗證,修改docker-compose.yml文件Web app的容器服務,docker-compose up;

僅數據處理程序WebApp容器被重建:

 Nice,分佈式改造上線,效果很明顯,如今能夠放心安全的迭代Web App數據處理程序。

做者: JulianHuang

碼甲拙見,若有問題請下方留言大膽斧正;碼字+Visio製圖,均爲原創,看官請不吝好評+關注,  ~。。~

本文歡迎轉載,請轉載頁面明顯位置註明原做者及原文連接

相關文章
相關標籤/搜索