[京東技術]聲明:本文轉載自微信公衆號「開濤的博客」,轉載務必聲明。redis
隊列在數據結構中是一種線性表,從一端插入數據,而後從另外一端刪除數據。本文目的不是講解各類隊列算法,而是在應用層面講述使用隊列能解決哪些場景問題。算法
在我開發過的系統中,不是全部的業務都必須實時處理、不是全部的請求都必須實時反饋結果給用戶、不是全部的請求/處理都必須100%處理成功、不知道誰依賴「我」的處理結果、不關心其餘系統如何處理後續業務、不須要強一致性,只需保證最終一致性便可、想要保證數據處理的有序性;此時你應該考慮使用隊列來解決這些問題。在實際開發時咱們常用隊列進行異步處理、系統解耦、數據同步、流量削峯、緩衝、限流等。sql
應用場景數據庫
異步處理:使用隊列的一個主要緣由是進行異步處理,好比用戶註冊成功後須要發送註冊成功郵件/新用戶積分/優惠券等等、緩存過時時先返回老的數據,而後異步更新緩存、異步寫日誌等;經過異步處理,能夠提高主流程響應速度,而非主流程/非重要業務能夠異步集中處理,這樣還能夠將任務聚合而後批量處理;所以可使用消息隊列/任務隊列來進行異步處理。緩存
系統解耦:好比用戶成功支付完成訂單後,須要通知生產配貨系統、發票系統、庫存系統、推薦系統、搜索系統、風控系統等進行業務處理;而將來須要添加/支持哪些業務是不清楚的,並且這些業務處理不須要實時處理、不須要強一致,只須要最終一致性便可,所以能夠經過消息隊列/任務隊列進行系統解耦。服務器
數據同步:好比想把Mysql變動的數據同步到Redis、或者將Mysql數據同步到Mongodb、或者機房間數據同步、或者主從數據同步等,此時能夠考慮使用如databus、canal、otter。使用數據總線隊列進行數據同步的好處是能夠保證數據修改的有序性。微信
流量削峯:系統瓶頸通常在數據庫上,好比扣減庫存、下單等;此時能夠考慮使用隊列將變動請求暫時放入隊列,經過緩存+隊列暫存的方式將數據庫流量削峯;還有如秒殺系統,下單服務會是該系統的瓶頸,此時會使用隊列進行排隊和限流,從而保護下單服務。經過隊列暫存或者隊列限流來削峯。網絡
好比減庫存,能夠考慮這樣設計:數據結構
直接在Redis中扣減,而後記錄下扣減日誌(FIFO隊列),經過Worker去同步到DB。併發
實際隊列的應用場景仍是很是多的,本文列舉了筆者遇到過比較多的場景。
緩衝區隊列
典型的如Log4j的日誌緩衝區,當咱們使用log4j記錄日誌時,能夠配置字節緩衝區,字節緩存區滿時會當即同步到磁盤(flush操做)。Log4j使用BufferedWriter實現的;此模式不是異步寫,在緩衝區滿的時候仍是會阻塞主線程。若是須要異步模式可使用AsyncAppender,而後經過bufferSize控制日誌事件緩衝區大小。
經過緩衝區隊列能夠實現:批量處理、異步處理。
任務隊列
使用任務隊列將一些不須要與主線程同步執行的任務扔到任務隊列異步處理便可;筆者用的最多的是線程池任務隊列(默認LinkedBlockingQueue)和Disruptor任務隊列(RingBuffer)。如刷數據時,將任務扔到隊列異步處理便可,處理成功後再異步通知用戶;還有如刪除SKU操做,用戶請求時直接將任務分解並扔到隊列,異步處理,處理成功後異步通知用戶便可;還有如查詢聚合,將多個可並行處理的任務扔到隊列而後等待最慢的一個返回。若是使用的是內存任務隊列請記住可能存在系統重啓等問題形成的數據丟失。
經過任務隊列能夠實現:異步處理、任務分解/聚合處理。
注:JDK7提供了ExecutorService的新的實現ForkJoinPool,其提供了Work-stealing機制,能夠更好地提高併發效率。
在使用Executors.newFixedThreadPool時,其沒有設置隊列大小(默認Integer.MAX_VALUE),若是有大量任務被緩存到LinkedBlockingQueue中等待線程執行,會出現GC慢等問題,形成系統響應慢甚至OOM。所以在使用線程池時候,要指定隊列大小並設置合理的RejectedExecutionHandler;要記錄請求來源的參數方便定位引起問題的源頭。
消息隊列
筆者所在公司使用的是自研的JMQ;開源的有ActiveMQ、Kafka、Redis。使用消息隊列存儲各業務數據,其餘系統根據須要訂閱便可。常見的模式是:點對點(一個消息只有一個消費者)、發佈訂閱(一個消息能夠有多個消費者);而經常使用的是發佈訂閱模式。
好比用戶註冊成功、修改商品數據、訂單狀態變動等都應該將變動發送到消息隊列,從而其餘系統根據須要訂閱該消息,而後按照本身的需求進行業務邏輯開發。
在添加新功能時,消息消費者只須要訂閱該消息,而後開發相應的業務邏輯,消息生產者根本不關心你怎麼使用消息和你作什麼業務處理。
同步調用,添加什麼新功能都須要到用戶系統提需求。其中一個服務出現問題了,整個服務就不可用了。
消息隊列,用戶系統只須要發佈用戶註冊成功的消息便可,相關係統訂閱該消息,而後執行相關的業務邏輯。相關服務出問題不影響到註冊主流程。
經過消息隊列能夠實現:異步處理、系統解耦。
請求隊列
請求隊列是指如在Web環境下對用戶請求排隊,從而進行一些特殊控制:流量控制、請求分級、請求隔離;如將請求按照功能劃分到不一樣的隊列,從而使得不一樣的隊列出現問題後相互不影響;還能夠對請求分級,一些重要請求能夠優先處理(發展到必定程度應將功能物理分離);還有服務器處理能力有限,在接近服務器瓶頸時須要考慮限流,最簡單的限流時丟棄處理不了的請求,此時可使用隊列進行流量控制。
數據總線隊列
通常消息隊列中的消息都是業務維度的,好比業務鍵或者業務狀態等,好比哪一個SKU變動了,而有些訂閱者須要再查一遍來獲取最新的修改數據(好比緩存同步);經過現有的消息隊列方式的缺點是很難只進行修改部分的推送和保證數據有序性。而此種場景比較適合使用數據總線隊列實現。如數據庫數據修改後須要同步數據到緩存,或者須要將一個機房數據同步到另外一個機房,只是數據維度的同步,此時應該使用數據總線隊列如canal、otter、databus;使用數據總線隊列的好處是能夠保證數據的有序性。
混合隊列
在《構建需求響應式億級商品詳情頁》曾介紹過該方式的隊列,使用混合隊列來解決實際問題。
此處MQ是使用京東自研的JMQ,消息是可靠持久化存儲的;應用會按照不一樣的維度發佈消息到JMQ;下游應用接收到該消息後會放入到Redis,使用Redis List來存儲這些任務;應用將Redis消息消費處理後,會按照不一樣的維度聚合商品消息而後再次發送出去。
使用Redis隊列的主要緣由是想提高消息堆積能力和併發處理能力。另外在使用Redis構建消息隊列時須要考慮網絡抖動形成的消息丟失問題,由於Redis是沒有回滾事務的,或者說是確認機制。咱們使用以下方式防止消息丟失:
try {
id = queueRedis.opsForList().rightPopAndLeftPush(queueName, processingQueueName);
} catch (Exception e) {
//發生了網絡異常,須要把processing中的id再放回到waiting queue中
String msg = queueName + " to " + processingQueueName + " rpoplpush error";
LOG.error(msg, e);
//報警代碼
}
而對於失敗咱們會進行重試三次,重試失敗後放入失敗隊列,而失敗隊列是具備防重功能的(從本地隊列和失敗隊列排重),使用的是Redis Lua腳本實現:
static EventQueueScript ADD_TO_FAIL_QUEUE_REDIS_SCRIPT = new EventQueueScript(
"redis.call('lrem', KEYS[1], 1, ARGV[1]) redis.call('lrem', KEYS[2], 1, ARGV[1]) return redis.call('lpush', KEYS[2], ARGV[1])"
);
Redis做者Antirez開發的內存分佈式消息隊列Disque是將來更好的內存消息隊列選擇。
其餘
優先級隊列:在實際開發時確定有些任務是緊急的,此時應該優先處理緊急的任務;因此請考慮對隊列進行分級。
副本隊列:在進行一些系統重構或者上新的功能時,若是沒有足夠的信心保證業務邏輯正確,能夠考慮存儲一份隊列的副本(好比1小時、1天的),從而當業務出現問題時能夠對這些消息進行回放。
鏡像隊列:每一個隊列不會無限制訂閱數量,必定會有一個極限的;當到達極限時請考慮使用鏡像隊列方式解決該問題。
隊列併發數:不一樣隊列實現,隊列服務端併發鏈接數是不同的;必定不是增大隊列併發鏈接數消費能力也隨着增長;也不會由於增長了消費服務器消費併發能力也隨着增長,須要根據實際狀況來設置合理的併發鏈接數。
推仍是拉:消息體內容不是越全越好,須要根據具體業務設計消息體;若有些系統依賴商品變動消息(只有一個SKU)、有些系統依賴商品狀態消息(SKU、狀態)、有些系統依賴商品屬性變動消息(SKU、變動的屬性)等,若是讓全部系統都消費商品變動消息,那麼這些系統都會調用商品查詢服務拉一下最新的商品信息而後進行處理。所以要根據實際狀況來決定是使用推送方式(將系統須要的全部信息推過去)仍是拉取方式(只推送ID,而後再查一遍)。
消息合併:若是消息寫入量很是大,應該考慮將消息合併寫,能夠"寫應用本地磁盤隊列"-->「同步本地磁盤隊列到消息中間件」;同步時能夠根據需求制定同步策略,如1秒同步1次。