和其餘大部分的 NoSQL 不一樣,Redis 是存在事務的,儘管它沒有數據庫那麼強大,可是它仍是頗有用的,尤爲是在那些須要高併發的網站當中。redis
使用 Redis 讀/寫數據要比數據庫快得多,若是使用 Redis 事務在某種場合下去替代數據庫事務,則能夠在保證數據一致性的同時,大幅度提升數據讀/寫的響應速度。互聯網系統面向的是公衆,不少用戶同時訪問服務器的可能性很大,尤爲在一些商品搶購、搶紅包等場合,對性能和數據的一致性有着很高的要求,而存儲系統的讀/寫響應速度對於這類場景的性能的提升是十分重要的。spring
在 Redis 中,也存在多個客戶端同時向 Redis 系統發送命令的併發可能性,所以同一個數據,可能在不一樣的時刻被不一樣的線程所操縱,這樣就出現了併發下的數據一致的問題。爲了保證異性數據的安全性,Redis 爲提供了事務方案。而 Redis 的事務是使用 MULTI-EXEC 的命令組合,使用它能夠提供兩個重要的保證:數據庫
在 Redis 的鏈接中,請注意要求是一個鏈接,因此更多的時候在使用 Spring 中會使用 SessionCallback 接口進行處理,在 Redis 中使用事務會通過 3 個過程:編程
Redis 事務命令,如表所示。緩存
命 令 | 說 明 | 備 注 |
---|---|---|
multi | 開啓事務命令,以後的命令就進入隊列,而不會立刻被執行 | 在事務生存期間,全部的 Redis 關於數據結構的命令都會入隊 |
watch key1 [key2......] | 監聽某些鍵,當被監聽的鍵在事務執行前被修改,則事務會被回滾 | 使用樂觀鎖 |
unwatch key1 [key2......] | 取消監聽某些鍵 | —— |
exec | 執行事務,若是被監聽的鍵沒有被修改,則採用執行命令,不然就回滾命令 | 在執行事務隊列存儲的命令前,Redis 會檢測被監聽的鍵值對有沒有發生變化,若是沒有則執行命令, 不然就回滾事務 |
discard | 回滾事務 | 回滾進入隊列的事務命令,以後就不能再用 exec 命令提交了 |
在 Redis 中開啓事務是 multi 命令,而執行事務是 exec 命令。multi 到 exec 命令之間的 Redis 命令將採起進入隊列的形式,直至 exec 命令的出現,纔會一次性發送隊列裏的命令去執行,而在執行這些命令的時候其餘客戶端就不能再插入任何命令了,這就是 Redis 的事務機制。安全
Redis 命令執行事務的過程,如圖 1 所示。服務器
圖 1 Redis命令執行事務的過程微信
從圖 1 中能夠看到,先使用 multi 啓動了 Redis 的事務,所以進入了 set 和 get 命令,咱們能夠發現它並未立刻執行,而是返回了一個「QUEUED」的結果。網絡
這說明 Redis 將其放入隊列中,並不會立刻執行,當命令執行到 exec 的時候它就會把隊列中的命令發送給 Redis 服務器,這樣存儲在隊列中的命令就會被執行了,因此纔會有「OK」和「value1」的輸出返回。數據結構
若是回滾事務,則可使用 discard 命令,它就會進入在事務隊列中的命令,這樣事務中的方法就不會被執行了,使用 discard 命令取消事務如圖 2 所示。
圖 2 使用 discard 命令取消事務
當咱們使用了 discard 命令後,再使用 exec 命令時就會報錯,由於 discard 命令已經取消了事務中的命令,而到了 exec 命令時,隊列裏面已經沒有命令能夠執行了,因此就出現了報錯的狀況。
教程前面咱們討論過,在 Spring 中要使用同一個鏈接操做 Redis 命令的場景,這個時候咱們藉助的是 Spring 提供的 SessionCallback 接口,採用 Spring 去實現本節的命令,代碼以下所示。
ApplicationContext applicationContext= new ClassPathXmlApplicationContext("applicationContext.xml"); RedisTemplate redisTemplate = applicationContext.getBean(RedisTemplate.class); SessionCallback callBack = (SessionCallback) (RedisOperations ops)-> { ops.multi(); ops.boundValueOps("key1").set("value1"); //注意因爲命令只是進入隊列,而沒有被執行,因此此處採用get命令,而value卻返回爲null String value = (String) ops.boundValueOps("key1").get(); System.out.println ("事務執行過程當中,命令入隊列,而沒有被執行,因此value爲空: value="+value); //此時list會保存以前進入隊列的全部命令的結果 List list = ops.exec(); //執行事務 //事務結束後,獲取value1 value = (String) redisTemplate.opsForValue().get("key1"); return value; }; //執行Redis的命令 String value = (String)redisTemplate.execute(callBack); System.out.println(value);
這裏採用了 Lambda 表達式(注意,Java 8 之後才引入 Lambda 表達式)來爲 SessionCallBack 接口實現了業務邏輯。從代碼看,使用了 SessionCallBack 接口,從而保證全部的命令都是經過同一個 Redis 的鏈接進行操做的。
在使用 multi 命令後,要特別注意的是,使用 get 等返回值的方法一概返回爲空,由於在 Redis 中它只是把命令緩存到隊列中,而沒有去執行。使用 exec 後就會執行事務,執行完了事務後,執行 get 命令就能正常返回結果了。
最後使用 redisTemplate.execute(callBack); 就能執行咱們在 SessionCallBack 接口定義的 Lambda 表達式的業務邏輯,並將得到其返回值。執行代碼後能夠看到這樣的結果,如圖 3 所示:
圖 3 運行結果
須要再強調的是:這裏打印出來的 value=null,是由於在事務中,全部的方法都只會被緩存到 Redis 事務隊列中,而沒有當即執行,因此返回爲 null,這是在 Java 對 Redis 事務編程中開發者極其容易犯錯的地方,必定要十分注意才行。若是咱們但願獲得 Redis 執行事務各個命令的結果,能夠用這行代碼:
List list = ops.exec(); //執行事務
這段代碼將返回以前在事務隊列中全部命令的執行結果,並保存在一個 List 中,咱們只要在 SessionCallback 接口的 execute 方法中將 list 返回,就能夠在程序中得到各個命令執行的結果了。
對於 Redis 而言,不僅僅須要注意其事務處理的過程,其回滾的能力也和數據庫不太同樣,這也是須要特別注意的一個問題——Redis 事務遇到的命令格式正確而數據類型不符合,如圖所示。
圖 1 Redis事務遇到命令格式正確而數據類型不符合
從圖 1 中可知,咱們將 key1 設置爲字符串,而使用命令 incr 對其自增,可是命令只會進入事務隊列,而沒有被執行,因此它不會有任何的錯誤發生,而是等待 exec 命令的執行。
當 exec 命令執行後,以前進入隊列的命令就依次執行,當遇到 incr 時發生命令操做的數據類型錯誤,因此顯示出了錯誤,而其以前和以後的命令都會被正常執行。注意,這裏命令格式是正確的,問題在於數據類型,對於命令格式是錯誤的倒是另一種情形,如圖 2 所示。
圖 2 Redis事務遇到命令格式錯誤的
從圖 2 中能夠看到咱們使用的 incr 命令格式是錯誤的,這個時候 Redis 會當即檢測出來併產生錯誤,而在此以前咱們設置了 key1,在此以後咱們設置了 key2。當事務執行的時候,咱們發現 key2 的值爲空,說明被 Redis 事務回滾了。
經過上面兩個例子,能夠看出在執行事務命令的時候,在命令入隊的時候,Redis 就會檢測事務的命令是否正確,若是不正確則會產生錯誤。不管以前和以後的命令都會被事務所回滾,就變爲何都沒有執行。
當命令格式正確,而由於操做數據結構引發的錯誤,則該命令執行出現錯誤,而其以前和以後的命令都會被正常執行。這點和數據庫很不同,這是須要讀者注意的地方。
對於一些重要的操做,咱們必須經過程序去檢測數據的正確性,以保證 Redis 事務的正確執行,避免出現數據不一致的狀況。Redis 之因此保持這樣簡易的事務,徹底是爲了保證移動互聯網的核心問題——性能。
在 Redis 中使用 watch 命令能夠決定事務是執行仍是回滾。通常而言,能夠在 multi 命令以前使用 watch 命令監控某些鍵值對,而後使用 multi 命令開啓事務,執行各種對數據結構進行操做的命令,這個時候這些命令就會進入隊列。
當 Redis 使用 exec 命令執行事務的時候,它首先會去比對被 watch 命令所監控的鍵值對,若是沒有發生變化,那麼它會執行事務隊列中的命令,提交事務;若是發生變化,那麼它不會執行任何事務中的命令,而去事務回滾。不管事務是否回滾,Redis 都會去取消執行事務前的 watch 命令,這個過程如圖所示。
圖 1 Redis 執行事務過程
Redis 參考了多線程中使用的 CAS(比較與交換,Compare And Swap)去執行的。在數據高併發環境的操做中,咱們把這樣的一個機制稱爲樂觀鎖。
這句話仍是比較抽象,也很差理解。
因此先簡要論述其操做的過程,當一條線程去執行某些業務邏輯,可是這些業務邏輯操做的數據可能被其餘線程共享了,這樣會引起多線程中數據不一致的狀況。
爲了克服這個問題,首先,在線程開始時讀取這些多線程共享的數據,並將其保存到當前進程的副本中,咱們稱爲舊值(old value),watch 命令就是這樣的一個功能。
而後,開啓線程業務邏輯,由 multi 命令提供這一功能。在執行更新前,比較當前線程副本保存的舊值和當前線程共享的值是否一致,若是不一致,那麼該數據已經被其餘線程操做過,這次更新失敗。
爲了保持一致,線程就不去更新任何值,而將事務回滾;不然就認爲它沒有被其餘線程操做過,執行對應的業務邏輯,exec 命令就是執行「相似」這樣的一個功能。
注意,「相似」這個字眼,由於不徹底是,緣由是 CAS 原理會產生 ABA 問題。所謂 ABA 問題來自於 CAS 原理的一個設計缺陷,它可能引起 ABA 問題,如表 1 所示。
時間順序 | 線程1 | 線程2 | 說明 |
---|---|---|---|
T1 | X=A | — | 線程 1 加入監控 X |
T2 | 複雜運算開始 | 修改 X=B | 線程 2 修改 X,此刻爲 B |
T3 | 處理簡單業務 | — | |
T4 | 修改 X=A | 線程 2 修改 X,此刻又變回 A | |
T5 | 結束線程 2 | 線程 2 結束 | |
T6 | 檢測X=A,驗證經過,提交事務 | — | CAS 原理檢測經過,由於和舊值保持一致 |
在處理複雜運算的時候,被線程 2 修改的 X 的值有可能致使線程 1 的運算出錯,而最後線程 2 將 X 的值修改成原來的舊值 A,那麼到了線程 1 運算結束的時間順序 T6,它將檢測 X 的值是否發生變化,就會拿舊值 A 和當前的 X 的值 A 比對,結果是一致的,因而提交事務。
而後在複雜計算的過程當中 X 被線程 2 修改過了,這會致使線程 1 的運算出錯。在這個過程當中,對於線程 2 而言,X 的值的變化爲 A->B->A,因此 CAS 原理的這個設計缺陷被形象地稱爲「ABA 問題」。
僅僅記錄一箇舊值去比較是不足夠的,還要經過其餘方法避免 ABA 問題。常見的方法如 Hibernate 對緩存的持久對象(PO)加入字段 version 值,當每次操做一次該 PO,則 version=version+1,這樣採用 CAS 原理探測 version 字段,就能在多線程的環境中,排除 ABA 問題,從而保證數據的一致性。
關於 CAS 和樂觀鎖的概念,本教程還會從更深層次討論它們,暫時討論到這裏,當討論完了 CAS 和樂觀鎖,讀者再回頭來看這個過程,就會有更深的理解了。
從上面的分析能夠看出,Redis 在執行事務的過程當中,並不會阻塞其餘鏈接的併發,而只是經過比較 watch 監控的鍵值對去保證數據的一致性,因此 Redis 多個事務徹底能夠在非阻塞的多線程環境中併發執行,並且 Redis 的機制是不會產生 ABA 問題的,這樣就有利於在保證數據一致的基礎上,提升高併發系統的數據讀/寫性能。
下面演示一個成功提交的事務,如表 2 所示。
時刻 | 客戶端 | 說 明 |
---|---|---|
T1 | set key1 value1 | 初始化key1 |
T2 | watch key1 | 監控 key1 的鍵值對 |
T3 | multi | 開啓事務 |
T4 | set key2 value2 | 設置 key2 的值 |
T5 | exec | 提交事務,Redis 會在這個時間點檢測 key1 的值在 T2 時刻後,有沒有被其餘命令修改過,若是沒有,則提交事務去執行 |
這裏咱們使用了 watch 命令設置了一個 key1 的監控,而後開啓事務設置 key2,直至 exec 命令去執行事務,這個過程和圖 2 所演示的同樣。
圖 2 運行結果
這裏咱們看到了一個事務的過程,而 key2 也在事務中被成功設置。下面將演示一個提交事務的案例,如表 3 所示。
時刻 | 客戶端1 | 客戶端2 | 說 明 |
---|---|---|---|
T1 | set key1 value1 | 客戶端1:返回 OK | |
T2 | watch key1 | 客戶端1:監控 key1 | |
T3 | multi | 客戶端1:開啓事務 | |
T4 | set key2 value2 | 客戶端1:事務命令入列 | |
T5 | —— | set key1 vall | 客戶端2:修改 key1 的值 |
T6 | exec | —— | 客戶端1:執行事務,可是事務會先檢査在 T2 時刻被監控的 key1 是否被 其餘命令修改過。 由於客戶端 2 修改過,因此它會回滾事務,事實上若是客戶端執行的是 set key1 value1 命令,它也會認爲 key1 被修改過,而後返回(nil),因此是不會產生 ABA 問題的 |
圖 3 測試 Redis 事務回滾
在表 3 中有比較詳盡的說明,注意 T2 和 T6 時刻命令的說明,使用 Redis 事務要掌握這些內容。
在事務中 Redis 提供了隊列,這是一個能夠批量執行任務的隊列,這樣性能就比較高,可是使用 multi...exec 事務命令是有系統開銷的,由於它會檢測對應的鎖和序列化命令。
有時候咱們但願在沒有任何附加條件的場景下去使用隊列批量執行一系列的命令,從而提升系統性能,這就是 Redis 的流水線(pipelined)技術。
而現實中 Redis 執行讀/寫速度十分快,而系統的瓶頸每每是在網絡通訊中的延時,如圖 1 所示。
圖 1 系統的瓶頸
在實際的操做中,每每會發生這樣的場景,當命令 1 在時刻 T1 發送到 Redis 服務器後,服務器就很快執行完了命令 1,而命令 2 在 T2 時刻卻沒有經過網絡送達 Redis 服務器,這樣就變成了 Redis 服務器在等待命令 2 的到來,當命令 2 送達,被執行後,而命令 3 又沒有送達 Redis,Redis 又要繼續等待,依此類推,這樣 Redis 的等待時間就會很長,不少時候在空閒的狀態,而問題出在網絡的延遲中,形成了系統瓶頸。
爲了解決這個問題,可使用 Redis 的流水線,可是 Redis 的流水線是一種通訊協議,經過 Java API 或者使用 Spring 操做它,測試一下它的性能,代碼以下。
Jedis jedis = pool.getResource(); long start = System.currentTimeMillis(); // 開啓流水線 Pipeline pipeline = jedis.pipelined(); // 這裏測試10萬條的讀/寫2個操做 for (int i = 0; i < 100000; i++) { int j = i + 1; pipeline.set("pipeline_key_" + j, "pipeline_value_" + j); pipeline.get("pipeline_key_" + j); } // pipeline.sync(); //這裏只執行同步,可是不返回結果 // pipeline.syncAndReturnAll ();將返回執行過的命令返回的List列表結果 List result = pipeline.syncAndRetrunAll(); long end = System.currentTimeMillis(); // 計算耗時 System.err.println("耗時:" + (end - start) + "毫秒");
在電腦上測試這段代碼,它的耗時在 550 毫秒到 700 毫秒之間,也就是不到 1 秒的時間就完成多達 10 萬次讀/寫,可見其性能遠超數據庫。筆者的測試是 1 秒 2 萬屢次,可見使用流水線後其性能提升了數倍之多,效果十分明顯。執行過的命令的返回值都會放入到一個 List 中。
注意,這裏只是爲了測試性能而已,當你要執行不少的命令並返回結果的時候,須要考慮 List 對象的大小,由於它會「吃掉」服務器上許多的內存空間,嚴重時會致使內存不足,引起 JVM 溢出異常,因此在工做環境中,是須要讀者本身去評估的,能夠考慮使用迭代的方式去處理。
在 Spring 中,執行流水線和執行事務的方法一模一樣都比較簡單,使用 RedisTemplate 提供的 executePipelined 方法便可。下面將上面代碼的功能修改成 Spring 的形式供你們參考,代碼以下所示。
public static void testPipeline() { Applicationcontext applicationcontext = new ClassPathXmlApplicationContext("applicationcontext.xml"); RedisTemplate redisTemplate = applicationcontext.getBean(RedisTemplate.class); // 使用Java8的Lambda表達式 SessionCallback callBack = (SessionCallback) (RedisOperations ops)-> { for (int i = 0; i<100000; i++) { int j = i + 1; ops . boundValueOps ("pipeline_key_" + j ).set("piepeline_value_"+j); ops.boundValueOps("pipeline_key_" + j).get(); } return null; }; long start = System.currentTimeMillis(); //執行Redis的流水線命令 List resultList= redisTemplate.executePipelined(callBack); long end = System.currentTimeMillis(); System.err.println(end-start); }
這段代碼進行測試,其性能慢於不用 RedisTemplate 的,測試消耗的時間大約在 1 100 毫秒到 1 300 毫秒之間,也就是消耗的時間大約是其兩倍,但也屬於徹底能夠接受的性能範圍,一樣的在執行不少命令的時候,也須要考慮其對運行環境內存空間的開銷。
當使用銀行卡消費的時候,銀行每每會經過微信、短信或郵件通知用戶這筆交易的信息,這即是一種發佈訂閱模式,這裏的發佈是交易信息的發佈,訂閱則是各個渠道。這在實際工做中十分經常使用,Redis 支持這樣的一個模式。
發佈訂閱模式首先須要消息源,也就是要有消息發佈出來,好比例子中的銀行通知。首先是銀行的記帳系統,收到了交易的命令,成功記帳後,它就會把消息發送出來,這個時候,訂閱者就能夠收到這個消息進行處理了,觀察者模式就是這個模式的典型應用了。下面用圖 1 描述這樣的一個過程。
圖 1 交易信息發佈訂閱機制
這裏創建了一個消息渠道,短信系統、郵件系統和微信系統都在監聽這個渠道,一旦記帳系統把交易消息發送到消息渠道,則監聽這個渠道的各個系統就能夠拿到這個消息,這樣就能處理各自的任務了。它也有利於系統的拓展,好比如今新增一個彩信平臺,只要讓彩信平臺去監聽這個消息渠道便能獲得對應的消息了。
從上面的分析能夠知道如下兩點:
一樣的,Redis 也是如此。首先來註冊一個訂閱的客戶端,這個時候使用 SUBSCRIBE 命令。
好比監聽一個叫做 chat 的渠道,這個時候咱們須要先打開一個客戶端,這裏記爲客戶端 1,而後輸入命令:
SUBSCRIBE chat
這個時候客戶端 1 就會訂閱了一個叫做 chat 渠道的消息了。以後打開另一個客戶端,記爲客戶端 2,輸入命令:
publish chat "let's go!!"
這個時候客戶端 2 就向渠道 chat 發送消息:
"let's go!!"
咱們觀察客戶端 1,就能夠發現已經收到了消息,並有對應的信息打印出來。Redis 的發佈訂閱過程如圖 2 和圖 3 所示。
圖 2 Redis的發佈訂閱過程(1)
圖 3 Redis的發佈訂閱過程(2)
其出現的前後順序爲先出現圖 2 的上半部分,執行圖 3 命令以後運行結果爲圖 2 所示,當發佈消息的時候,對應的客戶端已經獲取到了這個信息。
下面在 Spring 的工做環境中展現如何配置發佈訂閱模式。首先提供接收消息的類,它將實現 org.springframework.data.redis.connection.MessageListener 接口,並實現接口定義的方法 public void onMessage(Message message,byte[]pattern),Redis 發佈訂閱監聽類代碼以下所示。
/*** imports ***/ public class RedisMessageListener implements MessageListener { private RedisTemplate redisTemplate; /*** 此處省略redisTemplate的 setter和getter方法 ***/ @Override public void onMessage(Message message, byte[] bytes) { // 獲取消息 byte[] body = message.getBody(); // 使用值序列化器轉換 String msgBody = (String) getRedisTemplate().getValueSerializer() .deserialize(body); System.err.println(msgBody); // 獲取 channel byte[] channel = message.getChannel(); // 使用字符串序列化器轉換 String channelStr = (String) getRedisTemplate().getStringSerializer() .deserialize(channel); System.err.println(channelStr); // 渠道名稱轉換 String bytesStr = new String(bytes); System.err.println(bytesStr); } }
爲了在 Spring 中使用這個類,須要對其進行配置。
<bean id="redisMsgListener" class="com.redis.listener.RedisMessageListener"> <property name="redisTemplate" ref="redisTemplate"/> </bean>
這樣就在 Spring 上下文中定義了監聽類。
有了監聽類還不能進行測試。爲了進行測試,要給一個監聽容器,在 Spring 中已有類 org.springframework.data.redis.listener.RedisMessageListenerContainer。它能夠用於監聽 Redis 的發佈訂閱消息,下面的配置就是爲了實現這個功能,讀者能夠經過註釋來了解它的配置要點。
<bean id="topicContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer" destroy-method="destroy"> <!--Redis鏈接工廠 --> <property name="connectionFactory" ref="connectionFactory" /> <!--鏈接池,這裏只要線程池生存,才能繼續監聽 --> <property name="taskExecutor"> <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler"> <property name="poolSize" value="3" /> </bean> </property> <!--消息監聽Map --> <property name="messageListeners"> <map> <!-- 配置監聽者,key-ref和bean id定義一致 --> <entry key-ref="redisMsgListener"> <!--監聽類 --> <bean class="org.springframework.data.redis.listener.ChannelTopic"> <constructor-arg value="chat" /> </bean> </entry> </map> </property> </bean>
這裏配置了線程池,這個線程池將會持續的生存以等待消息傳入,而這裏配置了容器用 id 爲 redisMsgListener的Bean 進行對渠道 chat 的監聽。當消息經過渠道 chat 發送的時候,就會使用 id 爲 redisMsgListener 的 Bean 進行處理消息。
經過如下代碼測試 Redis 發佈訂閱。
public static void main(String[] args) { ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml"); RedisTemplate redisTemplate = applicationContext.getBean(RedisTemplate.class); String channel = "chat"; redisTemplate.convertAndSend(channel, "I am lazy!!"); }
convertAndSend 方法就是向渠道 chat 發送消息的,當發送後對應的監聽者就能監聽到消息了。運行它,後臺就會打出對應的消息。