Redis 應用--延遲消息隊列

咱們平時說的消息隊列是指:RabbitMQ,RockerMQ,ActiveMQ 以及大數據的 Kafka,這是咱們常見的也是很是專業的消息中間件,裏面提供了豐富的功能;
可是當我須要使用消息中間件時,並不是都須要使用以上專業的消息中間件,好比:咱們只有一個消息隊列,只有一個消費者,那就不必使用上面很是專業的消息中間件,這種場景能夠直接使用 Redis 來作消息隊列(Redis 消息隊列 並不專業,沒有不少高級特性,適用於簡單場景)若對消息可靠性有極高的要求,就不適合使用 Redis;java

一、普通消息隊列 Redis 做爲消息中間件,可使用 Redis List 數據結構便可實現,使用 lpus/rpush 實現消息入隊,使用 lpop/rpop 實現消息出隊linux

127.0.0.1:6379> LPUSH wdh01-queue java linux oracle hive
(integer) 4
127.0.0.1:6379> length wdh01-queue
(error) ERR unknown command 'length'
127.0.0.1:6379> LLEN wdh01-queue
(integer) 4
127.0.0.1:6379> LPOP wdh01-queue
"hive"
127.0.0.1:6379> RPOP wdh01-queue
"java"
127.0.0.1:6379> 

 在客戶端維護一個死循環,讀取消息並處理,若隊列有消息則獲取,不然陷入死循環,直到下一次有消息進行處理;這種死循環會形成大量資源浪費;此時可使用 blpop/nrpop (無消息進入阻塞狀態,有效及喚醒,無延遲)redis

二、延遲消息隊列
延遲消息隊列能夠經過 Zset 實現,Zset 中有一個 score ,咱們可使用時間做爲 score,將 value 存到 Redis 中,經過輪詢方式不斷讀取消息
首先 消息是一個字符串直接發送便可,若消息是對象須要對其進行序列化,我這使用 JSON 實現對象序列化和反序列化,項目中需添加下 JSON 依賴;數據結構

<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.10.3</version>
</dependency>

構造消息對象oracle

public class Wdh01Message {
    private String id;
    private Object date;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Object getDate() {
        return date;
    }

    public void setDate(Object date) {
        this.date = date;
    }

    @Override
    public String toString() {
        return "Wdh01Message{" +
                "id='" + id + '\'' +
                ", date=" + date +
                '}';
    }
}

封裝消息隊列app

public class DelayMsgQueue {
    private Jedis jedis;
    private String queue;

    public DelayMsgQueue(Jedis jedis, String queue) {
        this.jedis = jedis;
        this.queue = queue;
    }

    /**
     * 消息入隊
     *
     * @param object 要入隊的消息
     */
    public void queue(Object object) {
        // 構造 msg
        Wdh01Message msg = new Wdh01Message();
        msg.setId(UUID.randomUUID().toString());
        msg.setDate(object);
        //序列化
        try {
            String s = new ObjectMapper().writeValueAsString(msg);
            //消息發送,延遲 5s
            System.out.println("--- msg push ---- " + new Date());
            jedis.zadd(queue, System.currentTimeMillis() + 5000, s);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 消息消費
     */
    public void loop() {
        while (!Thread.interrupted()) {
            //讀取時間 在 0 ~ 當前 score 的消息,每次讀取 一條
            Set<String> strings = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1);
            if (strings.isEmpty()) {
                // 消息爲空,休息一下,稍後重試
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                    break;
                }
                continue;
            }
            // 讀取到消息,直接讀取
            String next = strings.iterator().next();
            // 移除消息
            if (jedis.zrem(queue, next) > 0) {
                // 強盜消息,處理業務
                try {
                    // 反序列化
                    Wdh01Message wdh01 = new ObjectMapper().readValue(next, Wdh01Message.class);
                    System.out.println("--- get msg ----"+ new Date() + " -- "+ wdh01);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

測試dom

public class DelayMsgTest {
    public static void main(String[] args) {
        Redis redis = new Redis();
        redis.execute(jedis -> {
            DelayMsgQueue delayMsgQueue = new DelayMsgQueue(jedis, "wdh-queue");

            // 消息生產着
            Thread produer = new Thread() {
                @Override
                public void run() {
                    for (int i = 0; i < 5; i++) {
                        delayMsgQueue.queue("wdh01 --- " + i);
                    }
                }
            };
            // 消息消費者
            Thread consumer = new Thread() {
                @Override
                public void run() {
                    delayMsgQueue.loop();
                }
            };
            // 啓動
            produer.start();
            consumer.start();
            // 休息 7s
            try {
                Thread.sleep(20000);
                consumer.interrupt();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

    }
}

 

相關文章
相關標籤/搜索