咱們平時說的消息隊列是指:RabbitMQ,RockerMQ,ActiveMQ 以及大數據的 Kafka,這是咱們常見的也是很是專業的消息中間件,裏面提供了豐富的功能;
可是當我須要使用消息中間件時,並不是都須要使用以上專業的消息中間件,好比:咱們只有一個消息隊列,只有一個消費者,那就不必使用上面很是專業的消息中間件,這種場景能夠直接使用 Redis 來作消息隊列(Redis 消息隊列 並不專業,沒有不少高級特性,適用於簡單場景)若對消息可靠性有極高的要求,就不適合使用 Redis;java
一、普通消息隊列 Redis 做爲消息中間件,可使用 Redis List 數據結構便可實現,使用 lpus/rpush 實現消息入隊,使用 lpop/rpop 實現消息出隊linux
127.0.0.1:6379> LPUSH wdh01-queue java linux oracle hive (integer) 4 127.0.0.1:6379> length wdh01-queue (error) ERR unknown command 'length' 127.0.0.1:6379> LLEN wdh01-queue (integer) 4 127.0.0.1:6379> LPOP wdh01-queue "hive" 127.0.0.1:6379> RPOP wdh01-queue "java" 127.0.0.1:6379>
在客戶端維護一個死循環,讀取消息並處理,若隊列有消息則獲取,不然陷入死循環,直到下一次有消息進行處理;這種死循環會形成大量資源浪費;此時可使用 blpop/nrpop (無消息進入阻塞狀態,有效及喚醒,無延遲)redis
二、延遲消息隊列
延遲消息隊列能夠經過 Zset 實現,Zset 中有一個 score ,咱們可使用時間做爲 score,將 value 存到 Redis 中,經過輪詢方式不斷讀取消息
首先 消息是一個字符串直接發送便可,若消息是對象須要對其進行序列化,我這使用 JSON 實現對象序列化和反序列化,項目中需添加下 JSON 依賴;數據結構
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.10.3</version> </dependency> ,
構造消息對象oracle
public class Wdh01Message { private String id; private Object date; public String getId() { return id; } public void setId(String id) { this.id = id; } public Object getDate() { return date; } public void setDate(Object date) { this.date = date; } @Override public String toString() { return "Wdh01Message{" + "id='" + id + '\'' + ", date=" + date + '}'; } }
封裝消息隊列app
public class DelayMsgQueue { private Jedis jedis; private String queue; public DelayMsgQueue(Jedis jedis, String queue) { this.jedis = jedis; this.queue = queue; } /** * 消息入隊 * * @param object 要入隊的消息 */ public void queue(Object object) { // 構造 msg Wdh01Message msg = new Wdh01Message(); msg.setId(UUID.randomUUID().toString()); msg.setDate(object); //序列化 try { String s = new ObjectMapper().writeValueAsString(msg); //消息發送,延遲 5s System.out.println("--- msg push ---- " + new Date()); jedis.zadd(queue, System.currentTimeMillis() + 5000, s); } catch (Exception e) { e.printStackTrace(); } } /** * 消息消費 */ public void loop() { while (!Thread.interrupted()) { //讀取時間 在 0 ~ 當前 score 的消息,每次讀取 一條 Set<String> strings = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1); if (strings.isEmpty()) { // 消息爲空,休息一下,稍後重試 try { Thread.sleep(500); } catch (Exception e) { break; } continue; } // 讀取到消息,直接讀取 String next = strings.iterator().next(); // 移除消息 if (jedis.zrem(queue, next) > 0) { // 強盜消息,處理業務 try { // 反序列化 Wdh01Message wdh01 = new ObjectMapper().readValue(next, Wdh01Message.class); System.out.println("--- get msg ----"+ new Date() + " -- "+ wdh01); } catch (JsonProcessingException e) { e.printStackTrace(); } } } } }
測試dom
public class DelayMsgTest { public static void main(String[] args) { Redis redis = new Redis(); redis.execute(jedis -> { DelayMsgQueue delayMsgQueue = new DelayMsgQueue(jedis, "wdh-queue"); // 消息生產着 Thread produer = new Thread() { @Override public void run() { for (int i = 0; i < 5; i++) { delayMsgQueue.queue("wdh01 --- " + i); } } }; // 消息消費者 Thread consumer = new Thread() { @Override public void run() { delayMsgQueue.loop(); } }; // 啓動 produer.start(); consumer.start(); // 休息 7s try { Thread.sleep(20000); consumer.interrupt(); } catch (InterruptedException e) { e.printStackTrace(); } }); } }