Redis 實戰 —— 05. Redis 其餘命令簡介

發佈與訂閱 P52

Redis 實現了發佈與訂閱(publish/subscribe)模式,又稱 pub/sub 模式(與設計模式中的觀察者模式相似)。訂閱者負責訂閱頻道,發送者負責向頻道發送二進制字符串消息。每當有消息被髮送至給定頻道時,頻道的全部訂閱者都會接收到消息。git

發佈與訂閱命令 P52
命令 格式 描述
SUBSCRIBE SUBSCRIBE channel [channel ...] 訂閱一個或多個頻道
UNSUBSCRIBE UNSUBSCRIBE [channel [channel ...]] 退訂一個或多個頻道;沒有指定頻道,則退訂所有頻道
PUBLISH PUBLISH channel message 給指定頻道發送消息,返回接收到消息的訂閱者數量
PSUBSCRIBE PSUBSCRIBE pattern [pattern ...] 訂閱一個或多個模式,與模式匹配的頻道均會訂閱
PUNSUBSCRIBE PUNSUBSCRIBE [pattern [pattern ...]] 退訂一個或多個模式;沒有指定模式,則退訂所有模式

相關演示代碼以下:github

// 執行發佈訂閱相關操做(注意:pubSubConn 中的 Conn 對象不能是 conn 對象,即必須創建兩個不一樣的鏈接)
func executePubSubOperation(pubSubConn redis.PubSubConn, conn redis.Conn) {
	// 監聽頻道消息並輸出
	go func() {
		for ; ; {
			switch result := pubSubConn.Receive().(type) {
			case redis.Message:
				// byte 轉 string
				resultMap := map[string]string  {
					"Channel": result.Channel,
					"Pattern": result.Pattern,
					"Data": string(result.Data),
				}
				handleResult(resultMap, nil)
			case redis.Subscription:
				handleResult(result, nil)
			}

		}
	}()

	// 訂閱兩個頻道(因爲 Subscribe 內沒有執行 Receive,因此只有 error,沒有錯誤時就輸出 nil)
	// 訂閱者收到相應的消息訂閱信息,分別輸出 -> {subscribe channel_1 1} 和 {subscribe channel_2 2}
	handleResult(nil, pubSubConn.Subscribe("channel_1", "channel_2"))
	// 訂閱兩個模式,分別以 _1 和 g_2 爲結尾的頻道 (因爲 PSubscribe 內沒有執行 Receive,因此只有 error,沒有錯誤時就輸出 nil)
	// 訂閱者收到相應的消息訂閱信息,分別輸出 -> {psubscribe *_1 3} 和 {psubscribe *g_2 4}
	handleResult(nil, pubSubConn.PSubscribe("*_1", "*g_2"))

	time.Sleep(time.Second)

	// 發佈消息到頻道 channel_1,輸出 -> 2,兩個訂閱者接收到消息
	// 訂閱者分別輸出 -> map[Channel:channel_1 Data:channel1 Pattern:] 和 map[Channel:channel_1 Data:channel1 Pattern:*_1]
	handleResult(conn.Do("PUBLISH", "channel_1", "channel1"))
	// 發佈消息到頻道 channel_2,輸出 -> 1,一個訂閱者接收到消息
	// 訂閱者輸出 -> map[Channel:channel_2 Data:channel1 Pattern:]
	handleResult(conn.Do("PUBLISH", "channel_2", "channel1"))

	// 退訂兩個頻道(因爲 Subscribe 內沒有執行 Receive,因此只有 error,沒有錯誤時就輸出 nil)
	// 訂閱者收到相應的消息退訂信息,分別輸出 -> {unsubscribe channel_1 3} 和 {unsubscribe channel_2 2}
	handleResult(nil, pubSubConn.Unsubscribe("channel_1", "channel_2"))
	// 退訂兩個頻道(因爲 Subscribe 內沒有執行 Receive,因此只有 error,沒有錯誤時就輸出 nil)
	// 訂閱者收到相應的消息退訂信息,分別輸出 -> {punsubscribe *_1 1} 和 {punsubscribe *g_2 0}
	handleResult(nil, pubSubConn.PUnsubscribe("*_1", "*g_2"))

	time.Sleep(time.Second)
}
風險 P54
  • 穩定性:舊版 Redis 的客戶端讀取消息不夠快時,不斷積壓的消息就會使 Redis 的緩衝區愈來愈大,可能致使 Redis 的速度變慢,甚至直接崩潰,也有使 Redis 可能被操做系統強制殺死。新版 Redis 會自動斷開不符合 client-output-buffer-limit pubsub 配置選項要求的客戶端。
  • 可靠性:任何網絡系統在執行操做時都有可能會趕上斷線狀況,而斷線產生的鏈接錯誤一般會使得網絡鏈接兩端中的其中一端進行從新鏈接。若是客戶端在執行訂閱操做的過程當中斷線,那麼客戶端將丟失在斷線期間發送的全部消息。

排序 P54

SORT 命令能夠對列表、集合和有序集合進行排序 ,能夠將 SORT 命令看做使 SQL 中的 order by 子句。 P55redis

命令 格式 描述
SORT SORT key [BY pattern] [LIMIT offset count] [GET pattern [GET pattern ...]] [ASC|DESC] [ALPHA] [STORE destination] 根據給定的選項,返回或保存給定列表、集合、有序集合 key 中通過排序的元素

可實現功能: P55數據庫

  • 根據升序或降序排序元素(使用 [ASC|DESC],默認爲升序)
  • 將元素看做數字或者字符串進行排序(使用 [ALPHA] 能夠看成字符串排序,默認爲數字)
  • 使用被排序元素以外的其餘值做爲權重來排序,甚至還能夠從輸入的列表、集合、有序集合之外的其餘地方進行取值(使用 [BY pattern] 能夠根據指定值排序;可使用不存在的鍵做爲參數選項跳過排序沒直接返回結果)
  • 使用被排序元素以外的其餘值做爲返回結果(使用 [GET pattern [GET pattern ...]] 能夠根據排序結果返回相應的值)
  • 保存排序結果(使用 [STORE destination] 能夠指定將結果保存到指定 key,此時返回保存的元素的數量)
  • 限制返回結果(使用 [LIMIT offset count] 能夠指定要跳過的元素數量和返回的元素數量)

相關演示代碼以下:設計模式

// 執行 SORT 命令
func executeSortOperation(conn redis.Conn) {
	// 刪除原有值
	handleResult(redis.Int(conn.Do("DEL", "id", "age", "name", "destination")))
	// 初始化
	handleResult(redis.Int(conn.Do("RPUSH", "id", 1, 4, 3, 2, 5)))
	handleResult(redis.String(conn.Do("SET", "age_1", 15)))
	handleResult(redis.String(conn.Do("SET", "age_2", 14)))
	handleResult(redis.String(conn.Do("SET", "age_3", 11)))
	handleResult(redis.String(conn.Do("SET", "age_4", 12)))
	handleResult(redis.String(conn.Do("SET", "age_5", 10)))
	handleResult(redis.String(conn.Do("SET", "name_1", "tom")))
	handleResult(redis.String(conn.Do("SET", "name_2", "jerry")))
	handleResult(redis.String(conn.Do("SET", "name_3", "bob")))
	handleResult(redis.String(conn.Do("SET", "name_4", "mary")))
	handleResult(redis.String(conn.Do("SET", "name_5", "jack")))

	// 根據 id 降序排序,跳過第一個元素,獲取接下來的兩個元素,輸出 -> [4 3]
	handleResult(redis.Ints(conn.Do("SORT", "id", "LIMIT", "1", "2", "DESC")))
	// 根據 age_{id} 升序排序,按照 id age_{id} name_{id} 順序返回結果,輸出 -> [5 10 jack 3 11 bob 4 12 mary 2 14 jerry 1 15 tom]
	handleResult(redis.Strings(conn.Do("SORT", "id", "BY", "age_*", "GET", "#", "GET", "age_*", "GET", "name_*", "ALPHA")))
	// 根據 name_{id} 字典序降序排序,按照 id age_{id} name_{id} 順序返回結果,存儲到 destination 中
	// 輸出 -> 15
	handleResult(redis.Int(conn.Do("SORT", "id", "BY", "name_*", "GET", "#", "GET", "age_*", "GET", "name_*", "ALPHA", "DESC", "STORE", "destination")))
	// 輸出 列表 結果,輸出 -> [1 15 tom 4 12 mary 2 14 jerry 5 10 jack 3 11 bob]
	handleResult(redis.Strings(conn.Do("LRANGE", "destination", 0, -1)))
}

基本的 Redis 事務

Redis 有 5 個命令可讓用戶在不被打斷的狀況下對多個鍵執行操做,它們分別是: WATCHMULTIEXECUNWATCHDISCART 。基本的 Redis 事務只用 MULTIEXEC 便可,使用多個命令的事務將在之後進行介紹。 P56網絡

Redis 的基本事務可讓一個客戶端在不被其餘客戶端打斷的狀況下執行多個命令。當一個事務執行完畢以後, Redis 纔會處理其餘客戶端的命令。 P56併發

假如某個(或某些) key 正處於 WATCH 命令的監視之下,且事務塊中有和這個(或這些) key 相關的命令,那麼 EXEC 命令只在這個(或這些) key 沒有被其餘命令所改動的狀況下執行並生效,不然該事務被打斷(abort)。ide

命令 格式 描述
MULTI MULTI 標記一個事務塊的開始,老是返回 OK
EXEC EXEC 執行全部事務塊內的命令,按順序返回命令的執行結果。當操做被打斷時,返回 nil

相關演示代碼以下:函數

// 執行事務命令
func executeTransactionOperation(conn redis.Conn) {
	// 刪除原有值
	handleResult(redis.Int(conn.Do("DEL", "counter")))
	// 開啓事務(採用流水線方式,下降通訊開銷)
	handleResult(nil, conn.Send("MULTI"))
	// 事務中執行自增操做(採用流水線方式,下降通訊開銷)
	handleResult(nil, conn.Send("INCR", "counter"))
	handleResult(nil, conn.Send("INCR", "counter"))
	handleResult(nil, conn.Send("INCR", "counter"))
	// 執行命令,依次執行自增操做,分別返回操做結果,輸出 -> [1 2 3]
	handleResult(redis.Ints(conn.Do("EXEC")))
}
練習題:移除競爭條件 P58

簡單實踐 - 文章投票VoteArticle 函數內曾說明沒有事務控制,會存在併發問題。該函數包含一個競爭條件以及一個由於競爭條件而出現的 bug 。函數的競爭條件可能會形成內存泄漏,而函數的 bug 則可能會致使不正確的投票結果出現。你能想辦法修復它們嗎?性能

提示:若是你以爲很難理解競爭條件爲何會致使內存泄漏,那麼能夠在分析 簡單實踐 - 文章投票 中的 PostArticle 的函數的同時,閱讀一下 6.2.5 節。

  • 感受仍是沒法理解爲何會有這種狀況,強行猜想如下可能性(雖然都不是競爭條件形成的):

    • PostArticle 函數中,在將做者加入到投票用戶集合中後,給其設定過時時間。若是設定過時時間以前因爲某些原有異常致使沒有進行相關操做,那麼這個集合將一直在內存中,不會過時,從而形成內存泄漏。
    • VoteArticle 函數中,若是將投票用戶添加到投票用戶集合中後,還沒來得及給文章的相關信息進行設置,那麼這個用戶之後不能再投票,而且文章的投票信息不對。
  • 不是太明白究竟在競爭什麼,只能針對以上問題處理。用事務只能再添加一個集合在事務中標記事務是否執行成功,處理流程大體以下:

    1. 先將用戶與文章做爲值加入到這個集合
    2. 再將用戶加入到投票集合中
    3. 而後開啓事務,依次發送更新信息的命令和刪除那個集合中的相關信息,並執行
    4. 最後有一個 worker 掃描這個集合,將其中值拿出來解析出用戶和文章,再查改用戶是否已在集合中,若是在集合中,則從新執行 步驟3,最後刪除該值
練習題:提升性能 P58

簡單實踐 - 文章投票ListArticles 函數在獲取整個頁面的文章時,須要在 Redis 與客戶端之間最多會進行 26 次通訊往返,這種作法十分低效,你可否想個辦法將 ListArticles 函數的往返次數下降爲 2 次呢?

提示:使用流水線

  • 獲取文章列表時,先獲取相應的 id 列表(最多 25 個),再循環獲取每一個 id 對應的文章,因此最多會進行 26 次通訊往返
  • 因爲必須先獲取 id 列表,再獲取每一個 id 對應的文章,因此只能將這兩塊分開,因此最低至少有 2 次通訊往返。大體流程以下:
    1. 先獲取 id 列表
    2. 使用流水線,依次將獲取每一個 id 的文章的命令發送至緩衝區,最後與服務端通訊並執行命令(Go 中可使用上述事務演示代碼的方式進行操做 )
    3. 最後按照順序解析結果

過時時間 P58

只有少數幾個命令能夠原子地爲鍵設置過時時間,而且對於列表、集合、哈希表和有序集合這樣的容器來講,鍵過時命令只能爲整個鍵設置過時時間,而沒辦法爲鍵裏面的單個元素設置過時時間(可使用存儲時間戳的有序集合來實現針對單個元素的過時時間;也能夠之前綴的形式將容器中的單個元素變爲字符串)。 P58

用於處理過時時間的 Redis 命令 P59
命令 格式 描述
PERSIST PERSIST key 移除鍵的過時時間
TTL TTL key 查看鍵距離過時時間還有多少秒
EXPIRE EXPIRE key seconds 讓鍵在指定的秒數以後過時
EXPIREAT EXPIREAT key timestamp 讓鍵在指定的 UNIX 秒級時間戳過時
PTTL PTTL key 查看鍵距離過時時間還有多少毫秒
PEXPIRE PEXPIRE key milliseconds 讓鍵在指定的毫秒數以後過時
PEXPIREAT PEXPIREAT key milliseconds-timestamp 讓鍵在指定的 UNIX 毫秒級時間戳過時

相關演示代碼以下:

// 指定過時時間相關的命令
func executeExpirationOperation(conn redis.Conn) {
	// 刪除原有值
	handleResult(redis.Int(conn.Do("DEL", "string")))
	// 設置字符串的值爲 value,輸出 -> OK,string 變爲 -> value
	handleResult(redis.String(conn.Do("SET", "string", "value")))
	// 查看 string 的過時時間,輸出 -> -1,表示不過時
	handleResult(redis.Int(conn.Do("TTL", "string")))
	// 設置 string 在 3 秒後過時,輸出 -> 1
	handleResult(redis.Int(conn.Do("EXPIRE", "string", 3)))
	time.Sleep(time.Second)
	// 查看 string 的過時時間,輸出 -> 2
	handleResult(redis.Int(conn.Do("TTL", "string")))
	// 移除 string 的過時時間,輸出 -> 1
	handleResult(redis.Int(conn.Do("PERSIST", "string")))
	// 查看 string 的過時時間,輸出 -> -1,表示不過時
	handleResult(redis.Int(conn.Do("TTL", "string")))

	// 設置 string 在當前時間 2500 毫秒後過時,輸出 -> 1
	handleResult(redis.Int(conn.Do("PEXPIREAT", "string", time.Now().UnixNano() / 1e6 + 2500)))
	time.Sleep(time.Second)
	// 查看 string 的過時時間,輸出 -> 1499,表示還有 1499 毫秒過時
	handleResult(redis.Int(conn.Do("PTTL", "string")))
	time.Sleep(2 * time.Second)
	// 查看 string 的過時時間,輸出 -> -2,表示已過時
	handleResult(redis.Int(conn.Do("PTTL", "string")))
}
練習題:使用 EXPIRE 命令代替時間戳有序集合 P59

簡單實踐 - Web應用中使用了一個根據時間戳排序、用於清除會話信息的有序集合,經過這個有序集合,程序能夠在清理會話的時候,對用戶瀏覽過的商品以及用戶購物車裏面的商品進行分析。可是,若是咱們決定不對商品進行分析的話,那麼就可使用 Redis 提供的過時時間操做來自動清理過時的會話信息,而無須使用清理函數。那麼,你可否修改簡單實踐 - Web應用中定義的 UpdateToken 函數和 UpdateCartItem 函數,讓它們使用過時時間操做來刪除會話信息,從而代替目前使用有序集合來記錄並清除會話信息的作法呢?

  • UpdateToken 函數:令牌於 userId 的對應關係不在存儲於哈希表中,而是之前綴的形式將容器中的單個元素變爲字符串(上面提到過),並設置過時時間,並移除最近操做時間有序集合,這樣令牌到期後就會自動刪除,不須要清理函數了。
  • UpdateCartItem 函數:因爲當時此處把 Redis 看成數據庫使用,認爲購物車不該該隨登陸態的失效而消失,因此購物車與 userId 掛鉤,不存在上述問題。可是若是要讓購物車也自動過時,就須要在 UpdateToken 函數內同時設置購物車的過時時間便可。

本文首發於公衆號:滿賦諸機(點擊查看原文) 開源在 GitHub :reading-notes/redis-in-action

相關文章
相關標籤/搜索