首先作簡單的引入。redis
MQ主要是用來:服務器
目前使用的較多的有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。
網上的資源對各類狀況都有詳細的解釋,在此不作過多贅述。本文
僅介紹如何使用Redis實現輕量級MQ的過程。數據結構
在業務的實現過程當中,就算沒有大量的流量,解耦和異步化幾乎也是到處可用,此時MQ就顯得尤其重要。但與此同時MQ也是一個蠻重的組件,例如咱們若是用RabbitMQ就必須爲它搭建一個服務器,同時若是要考慮可用性,就要爲服務端創建一個集羣,並且在生產若是有問題也須要查找功能。在中小型業務的開發過程當中,可能業務的其餘整個實現都沒這個重。太重的組件服務會成倍增長工做量。
所幸的是,Redis提供的list數據結構很是適合作消息隊列。
可是如何實現即時消費?如何實現ack機制?這些是實現的關鍵所在。app
網上所流傳的方法是使用Redis中list的操做BLPOP或BRPOP,即列表的阻塞式(blocking)彈出。
讓咱們來看看阻塞式彈出的使用方式:異步
BRPOP key [key ...] timeout
此命令的說明是:ide
一、當給定列表內沒有任何元素可供彈出的時候,鏈接將被 BRPOP 命令阻塞,直到等待超時或發現可彈出元素爲止。
二、當給定多個key參數時,按參數 key 的前後順序依次檢查各個列表,彈出第一個非空列表的尾部元素。
另外,BRPOP 除了彈出元素的位置和 BLPOP 不一樣以外,其餘表現一致。ui
以此來看,列表的阻塞式彈出有兩個特色:spa
一、若是list中沒有任務的時候,該鏈接將會被阻塞
二、鏈接的阻塞有一個超時時間,當超時時間設置爲0時,便可無限等待,直到彈出消息
由此看來,此方式是可行的,但此爲傳統的觀察者模式,業務簡單則可以使用,如A的任務只由B去執行。但若是A和Z的任務,B和C都能執行,那使用這種方式就相形見肘。這個時候就應該使用訂閱/發佈模式,使業務系統更加清晰。
好在Redis也支持Pub/Sub(發佈/訂閱)。在消息A入隊list的同時發佈(PUBLISH)消息B到頻道channel,此時已經訂閱channel的worker就接收到了消息B,知道了list中有消息A進入,便可循環lpop或rpop來消費list中的消息。流程以下:線程
其中的worker能夠是單獨的線程,也能夠是獨立的服務,其充當了Consumer和業務處理者角色。下面作實例說明。code
示例場景爲:worker要作同步文件功能,等到有文件生成時立馬同步。
首先開啓一個線程表明worker,來訂閱頻道channel:
@Service public class SubscribeService { @Resource private RedisService redisService; @Resource private SynListener synListener;//訂閱者 @PostConstruct public void subscribe() { new Thread(new Runnable() { @Override public void run() { LogCvt.info("服務已訂閱頻道:{}", channel); redisService.subscribe(synListener, channel); } }).start(); } }
代碼中的SynListener即爲所聲明的訂閱者,channel爲訂閱的頻道名稱,具體的訂閱邏輯以下:
@Service public class SynListener extends JedisPubSub { @Resource private DispatchMessageHandler dispatchMessageHandler; @Override public void onMessage(String channel, String message) { LogCvt.info("channel:{},receives message:{}",channel,message); try { //處理業務(同步文件) dispatchMessageHandler.synFile(); } catch (Exception e) { LogCvt.error(e.getMessage(),e); } } }
處理業務的時候,就去list中去消費消息:
@Service public class DispatchMessageHandler { @Resource private RedisService redisService; @Resource private MessageHandler messageHandler; public void synFile(){ while(true){ try { String message = redisService.lpop(RedisKeyUtil.syn_file_queue_key()); if (null == message){ break; } Thread.currentThread().setName(Tools.uuid()); // 隊列數據處理 messageHandler.synfile(message); } catch (Exception e) { LogCvt.error(e.getMessage(),e); } } } }
這樣咱們就達到了消息的實時消費的目的。
ack,即消息確認機制(Acknowledge)。
首先來看RabbitMQ的ack機制:
那麼在咱們用Redis實現消息隊列的ack機制的時候該怎麼作呢?
須要注意兩點:
上面第一點能夠在業務中完成,即失敗後執行回滾消息。
(該方案主要解決worker掛掉的狀況)
Redis做爲消息隊列是有很大侷限性的。由於其主要特性及用途決定它只能實現輕量級的消息隊列。寫在最後:沒有絕對好的技術,只有對業務最友好的技術,謹此獻給全部developer。