Redis系列(二)--消息隊列

1.概念node

Redis 的 list(列表) 數據結構經常使用來做爲異步消息隊列使用,使用rpush/lpush操做入隊列,使用lpop 和 rpop來出隊列。Redis 的消息隊列不是專業的消息隊列,它沒有很是多的高級特性,沒有 ack 保證,若是對消息的可靠性有着極致的追求,那麼它就不適合使用。命令方式:web

生產數據:redis

rpush test:aaron:queue apple banana pear

 

消費數據:spring

服務器

lpop test:aaron:queue

數據結構

rpop test:aaron:queue

注意:注意執行命令的時候,須要執行三次,才能所有獲取到數據。app

 

思考1:隊列空了怎麼辦?異步

 

客戶端是經過隊列的 pop 操做來獲取消息,而後進行處理。處理完了再接着獲取消息,再進行處理。如此循環往復,這即是做爲隊列消費者的客戶端的生命週期。ide

 

但是若是隊列空了,客戶端就會陷入 pop 的死循環,不停地 pop,沒有數據,接着再 pop,又沒有數據。這就是浪費生命的空輪詢。空輪詢不但拉高了客戶端的 CPU,redis 的 QPS 也會被拉高,若是這樣空輪詢的客戶端有幾十來個,Redis 的慢查詢可能會顯著增多。函數

 

一般咱們使用 sleep 來解決這個問題,讓線程睡一會,睡個 1s 鍾就能夠了。不但客戶端的 CPU 能降下來,Redis 的 QPS 也降下來了。

 

解決方式:

 

有沒有什麼辦法能顯著下降延遲呢?你固然能夠很快想到:那就把睡覺的時間縮短點。這種方式固然能夠,不過有沒有更好的解決方案呢?固然也有,那就是 blpop/brpop

 

這兩個指令的前綴字符b表明的是blocking,也就是阻塞讀。

 

阻塞讀在隊列沒有數據的時候,會當即進入休眠狀態,一旦數據到來,則馬上醒過來。消息的延遲幾乎爲零。用blpop/brpop替代前面的lpop/rpop,就完美解決了上面的問題。

 

思考2:空閒鏈接自動斷開

 

你覺得上面的方案真的很完美麼?先別急着開心,其實他還有個問題須要解決。

 

什麼問題?—— 空閒鏈接的問題。

 

若是線程一直阻塞在哪裏,Redis 的客戶端鏈接就成了閒置鏈接,閒置太久,服務器通常會主動斷開鏈接,減小閒置資源佔用。這個時候blpop/brpop會拋出異常來。

 

因此編寫客戶端消費者的時候要當心,注意捕獲異常,還要重試。

2.代碼

  1. py代碼




import redis#redis 鏈接pool = redis.ConnectionPool(host='127.0.0.1', port=6379)r = redis.Redis(connection_pool=pool)#消息隊列-def input_mq():#if 條件語句if r :count = 0while( count < 3):r.rpush("notify:queue","apple" + str(count))count = count + 1print(r.llen("notify:queue"))#消息隊列-延遲隊列def out_mq():#if 條件語句if r :count = 0;while( count < 3):print(r.brpop("notify:queue"))count = count + 1print(count)#主函數,執行行數if __name__ == '__main__':    input_mq()    out_mq()

    2 Java代碼

Java代碼

    2.1 目錄結構

 

    2.2 pom文件和配置

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
spring:redis:database: 0 #索引(默認爲0)host: localhost #地址port: 6379 #端口號#password:  #鏈接密碼(默認空)pool:max-idle: 8 #鏈接池中的最大空閒鏈接min-idle: 0 #鏈接池中的最小空閒鏈接max-active: 8 #鏈接池最大鏈接數(使用負值表示沒有限制)max-wait: -1 #鏈接池最大阻塞等待時間(使用負值表示沒有限制)#sentinel:#master: mymaster # 哨兵監聽的Redis server的名稱#nodes:#127.0.0.1:26379,127.0.0.1:26479,127.0.0.1:26579 #哨兵的配置列表timeout: 5000 #鏈接超時時間(毫秒)server:port: 8000

        2.3生產者




package com.example.redis.zfr.demoredis.mq;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;@Controllerpublic class RedisController {@Autowiredprivate RedisTemplate<String, String> redisTemplate;/*** 發佈消息* @param id* @return*/@RequestMapping("/sendMessage/{id}")public String sendMessage(@PathVariable String id) {redisTemplate.convertAndSend("msg1","哈哈哈,mq 繁榮Aaron 你好"+id);        //主要爲了測試多個主題的發送redisTemplate.convertAndSend("msg","哈哈哈,mq 繁榮Aaron 你好"+id);return "";}}

        2.4 消費者




package com.example.redis.zfr.demoredis.mq;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.connection.MessageListener;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.serializer.RedisSerializer;import org.springframework.stereotype.Component;@Componentpublic class RedisMessage implements MessageListener{@Autowiredprivate RedisTemplate<Object, Object> redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {RedisSerializer<String> serializer = redisTemplate.getStringSerializer();String msg = serializer.deserialize(message.getBody());System.out.println("接收到的消息是:" + msg);}}

        2.5 配置







package com.example.redis.zfr.demoredis.mq;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.listener.PatternTopic;import org.springframework.data.redis.listener.RedisMessageListenerContainer;import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;@Configurationpublic class RedisSubConfig {/*** 建立鏈接工廠** @param connectionFactory* @param adapter* @return*/@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter adapter,MessageListenerAdapter adapter1) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(adapter, new PatternTopic("msg"));        //主要爲了測試多個主題的發送container.addMessageListener(adapter1,new PatternTopic("msg1"));return container;}/*** @param message* @return*/@Beanpublic MessageListenerAdapter adapter(RedisMessage message){// onMessage 若是RedisMessage 中 沒有實現接口,這個參數必須跟RedisMessage中的讀取信息的方法名稱同樣return new MessageListenerAdapter(message, "onMessage");}    /**主要爲了測試多個主題的發送* @param message* @return*/@Beanpublic MessageListenerAdapter adapter1(RedisMessage1 message){// onMessage 若是RedisMessage 中 沒有實現接口,這個參數必須跟RedisMessage中的讀取信息的方法名稱同樣return new MessageListenerAdapter(message, "onMessage");}}

3.思考

思考問題

Redis 做爲消息隊列爲何不能保證 100% 的可靠性?

解決

pop出消息後,list 中就沒這個消息了,若是處理消息的程序拿到消息還未處理就掛掉了,那消息就丟失了,因此是不可靠隊列。https://redis.io/commands/rpoplpush 這個能夠實現可靠隊列。

相關文章
相關標籤/搜索