GitHub 9.4k Star 的Java工程師成神之路 ,不來了解一下嗎?html
GitHub 9.4k Star 的Java工程師成神之路 ,真的不來了解一下嗎?java
GitHub 9.4k Star 的Java工程師成神之路 ,真的肯定不來了解一下嗎?git
衆所周知,redis是一個高性能的key-value數據庫,在NoSQL數據庫市場上,redis本身就佔據了將近半壁江山,足以見到其強大之處。同時,因爲redis的單線程特性,咱們能夠將其用做爲一個消息隊列。本篇文章就來說講如何將redis整合到spring boot中,並用做消息隊列的……github
「消息隊列」是在消息的傳輸過程當中保存消息的容器。——《百度百科》web
消息咱們能夠理解爲在計算機中或在整個計算機網絡中傳遞的數據。redis
隊列是咱們在學習數據結構的時候學習的基本數據結構之一,它具備先進先出的特性。spring
因此,消息隊列就是一個保存消息的容器,它具備先進先出的特性。數據庫
下面一張圖咱們來簡單瞭解一下消息隊列apache
由上圖能夠看到,消息隊列充當了一箇中間人的角色,咱們能夠經過操做這個消息隊列來保證咱們的系統穩定。json
Java環境:jdk1.8
spring boot版本:2.2.1.RELEASE
redis-server版本:3.2.100
這裏只展現與redis相關的依賴,
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
複製代碼
這裏解釋一下這兩個依賴:
這裏只展現與redis相關的配置
# redis所在的的地址
spring.redis.host=localhost
# redis數據庫索引,從0開始,能夠從redis的可視化客戶端查看
spring.redis.database=1
# redis的端口,默認爲6379
spring.redis.port=6379
# redis的密碼
spring.redis.password=
# 鏈接redis的超時時間(ms),默認是2000
spring.redis.timeout=5000
# 鏈接池最大鏈接數
spring.redis.jedis.pool.max-active=16
# 鏈接池最小空閒鏈接
spring.redis.jedis.pool.min-idle=0
# 鏈接池最大空閒鏈接
spring.redis.jedis.pool.max-idle=16
# 鏈接池最大阻塞等待時間(負數表示沒有限制)
spring.redis.jedis.pool.max-wait=-1
# 鏈接redis的客戶端名
spring.redis.client-name=mall
複製代碼
redis用做消息隊列,其在spring boot中的主要表現爲一個RedisTemplate.convertAndSend()
方法和一個MessageListener
接口。因此咱們要在IOC容器中注入一個RedisTemplate
和一個實現了MessageListener
接口的類。話很少說,先看代碼
配置RedisTemplate的主要目的是配置序列化方式以解決亂碼問題,同時合理配置序列化方式還能下降一點性能開銷。
/**
* 配置RedisTemplate,解決亂碼問題
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
LOGGER.debug("redis序列化配置開始");
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
// string序列化方式
RedisSerializer serializer = new GenericJackson2JsonRedisSerializer();
// 設置默認序列化方式
template.setDefaultSerializer(serializer);
template.setKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(serializer);
LOGGER.debug("redis序列化配置結束");
return template;
}
複製代碼
代碼第12行,咱們配置默認的序列化方式爲GenericJackson2JsonRedisSerializer
代碼第13行,咱們配置鍵的序列化方式爲StringRedisSerializer
代碼第14行,咱們配置哈希表的值的序列化方式爲GenericJackson2JsonRedisSerializer
序列化方式 | 介紹 |
---|---|
StringRedisSerializer |
將對象序列化爲字符串,可是經測試,沒法序列化對象,通常用在key上 |
OxmSerializer |
將對象序列化爲xml性質,本質上是字符串 |
ByteArrayRedisSerializer |
默認序列化方式,將對象序列化爲二進制字節,可是須要對象實現Serializable接口 |
GenericFastJsonRedisSerializer |
json序列化,使用fastjson序列化方式序列化對象 |
GenericJackson2JsonRedisSerializer |
json序列化,使用jackson序列化方式序列化對象 |
上面說了,與redis隊列監聽器相關的類爲一個名爲MessageListener
的接口,下面是該接口的源碼
public interface MessageListener {
void onMessage(Message message, @Nullable byte[] pattern);
}
複製代碼
能夠看到,該接口僅有一個onMessage(Message message, @Nullable byte[] pattern)
方法,該方法即是監聽到隊列中消息後的回調方法。下面解釋一下這兩個參數:
byte[] getBody()
以二進制形式獲取消息體byte[] getChannel()
以二進制形式獲取消息通道message.getChannel()
返回值相同介紹完接口,咱們來實現一個簡單的redis隊列監聽器
@Component
public class RedisListener implement MessageListener{
private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);
@Override
public void onMessage(Message message,byte[] pattern){
LOGGER.debug("從消息通道={}監聽到消息",new String(pattern));
LOGGER.debug("從消息通道={}監聽到消息",new String(message.getChannel()));
LOGGER.debug("元消息={}",new String(message.getBody()));
// 新建一個用於反序列化的對象,注意這裏的對象要和前面配置的同樣
// 由於我前面設置的默認序列化方式爲GenericJackson2JsonRedisSerializer
// 因此這裏的實現方式爲GenericJackson2JsonRedisSerializer
RedisSerializer serializer=new GenericJackson2JsonRedisSerializer();
LOGGER.debug("反序列化後的消息={}",serializer.deserialize(message.getBody()));
}
}
複製代碼
代碼很簡單,就是輸出參數中包含的關鍵信息。須要注意的是,RedisSerializer
的實現要與上面配置的序列化方式一致。
隊列監聽器實現完之後,咱們還須要將這個監聽器添加到redis隊列監聽器容器中,代碼以下:
@Bean
public public RedisMessageListenerContainer container(RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(redisListener, new PatternTopic("demo-channel"));
return container;
}
複製代碼
這幾行代碼大概意思就是新建一個Redis消息監聽器容器,而後將監聽器和管道名想綁定,最後返回這個容器。
這裏要注意的是,這個管道名和下面將要說的推送消息時的管道名要一致,否則監聽器監聽不到消息。
上面咱們配置了RedisTemplate將要在這裏使用到。
代碼以下:
@Service
public class Publisher{
@Autowrite
private RedisTemplate redis;
public void publish(Object msg){
redis.convertAndSend("demo-channel",msg);
}
}
複製代碼
關鍵代碼爲第7行,redis.convertAndSend()
這個方法的做用爲,向某個通道(參數1)推送一條消息(第二個參數)。
這裏仍是要注意上面所說的,生產者和消費者的通道名要相同。
至此,消息隊列的生產者和消費者已經所有編寫完成。
在我添加了spring-boot-starter-log4j2
依賴並在spring-boot-starter-web
中排除了spring-boot-starter-logging
後,運行項目,仍是會提示下面的錯誤:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:.....m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:.....m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
複製代碼
這個錯誤就是maven中有多個日誌框架致使的。後來經過依賴分析,發如今spring-boot-starter-data-redis
中,也依賴了spring-boot-starter-logging
,解決辦法也很簡單,下面貼出詳細代碼
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
複製代碼
redis隊列監聽器的監聽機制是:使用一個線程監聽隊列,隊列有未消費的消息則取出消息並生成一個新的線程來消費消息。若是你還記得,我開頭說的是因爲redis單線程特性,所以咱們用它來作消息隊列,可是若是監聽器每次接受一個消息就生成新的線程來消費信息的話,這樣就徹底沒有使用到redis的單線程特性,同時還會產生線程安全問題。
最簡單的辦法莫過於爲onMessage()
方法加鎖,這樣簡單粗暴卻頗有用,不過這種方式沒法控制隊列監聽的速率,且無限制的創造線程最終會致使系統資源被佔光。
那如何解決這種狀況呢?線程池。
在將監聽器添加到容器的配置的時候,RedisMessageListenerContainer
類中有一個方法setTaskExecutor(Executor taskExecutor)
能夠爲監聽容器配置線程池。配置線程池之後,全部的線程都會由該線程池產生,由此,咱們能夠經過調節線程池來控制隊列監聽的速率。
單一消費者的問題相比於多個消費者來講仍是較爲簡單,由於Java內置的鎖都是隻能控制本身程序的運行,不能干擾其餘的程序的運行;然而如今不少時候咱們都是在分佈式環境下進行開發,這時處理多個消費者的狀況就頗有意義了。
那麼這種問題如何解決呢?分佈式鎖。
下面來簡要科普一下什麼是分佈式鎖:
分佈式鎖是指在分佈式環境下,同一時間只有一個客戶端可以從某個共享環境中(例如redis)獲取到鎖,只有獲取到鎖的客戶端才能執行程序。
而後分佈式鎖通常要知足:排他性(即同一時間只有一個客戶端可以獲取到鎖)、避免死鎖(即超時後自動釋放)、高可用(即獲取或釋放鎖的機制必須高可用且性能佳)
上面講依賴的時候,咱們導入了一個spring-integration-redis
依賴,這個依賴裏面包含了不少實用的工具類,而咱們接下來要講的分佈式鎖就是這個依賴下面的一個工具包RedisLockRegistry
。
首先講一下如何使用,導入了依賴之後,首先配置一個Bean
@Bean
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory factory) {
return new RedisLockRegistry(factory, "demo-lock",60);
}
複製代碼
RedisLockRegistry
的構造函數,第一個參數是redis鏈接池,第二個參數是鎖的前綴,即取出的鎖,鍵名爲「demo-lock:KEY_NAME」,第三個參數爲鎖的過時時間(秒),默認爲60秒,當持有鎖超過該時間後自動過時。
使用鎖的方法,下面是對監聽器的修改
@Component
public class RedisListener implement MessageListener{
@Autowrite
private RedisLockRegistry redisLockRegistry;
private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);
@Override
public void onMessage(Message message,byte[] pattern){
Lock lock=redisLockRegistry.obtain("lock");
try{
lock.lock(); //上鎖
LOGGER.debug("從消息通道={}監聽到消息",new String(pattern));
LOGGER.debug("從消息通道={}監聽到消息",new String(message.getChannel()));
LOGGER.debug("元消息={}",new String(message.getBody()));
// 新建一個用於反序列化的對象,注意這裏的對象要和前面配置的同樣
// 由於我前面設置的默認序列化方式爲GenericJackson2JsonRedisSerializer
// 因此這裏的實現方式爲GenericJackson2JsonRedisSerializer
RedisSerializer serializer=new GenericJackson2JsonRedisSerializer();
LOGGER.debug("反序列化後的消息={}",serializer.deserialize(message.getBody()));
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); //解鎖
}
}
}
複製代碼
上面代碼的代碼比起前面的監聽器代碼,只是多了一個注入的RedisLockRegistry
,一個經過redisLockRegistry.obtain()
方法獲取鎖,一個加鎖一個解鎖,而後這就完成了分佈式鎖的使用。
注意這個獲取鎖的方法redisLockRegistry.obtain()
,其返回的是一個名爲RedisLock的鎖,這是一個私有內部類,它實現了Lock接口,所以咱們不能從代碼外部建立一個他的實例,只能經過obtian()方法來獲取這個鎖。