玩轉redis-延時消息隊列

上一篇基於redis的list實現了一個簡單的消息隊列:玩轉redis-簡單消息隊列html

源碼地址 使用demogit

產品經理常常說的一句話,咱們不光要有X功能,還要Y功能,這樣客戶才能更滿意。一樣的,只有簡單消息隊列是不夠的,還要有延時消息隊列才能算是一個完整的消息隊列。github

看看redis的命令,放眼望去,的有序集合(sorted set)就是一個很好用的命令,徹底能夠用他作一個延時消息隊列redis

redis有序集合(sorted set)

redis有序集合,每一個元素都會關聯一個double類型的分數。redis正是經過分數來爲集合中的成員進行從小到大的排序。 有序集合的成員是惟一的,但分數(score)卻能夠重複。json

簡單操做

添加數據bash

127.0.0.1:6379> ZADD testSet1 5 a
(integer) 1
127.0.0.1:6379> ZADD testSet1 1 b 8 c 7 d
(integer) 3
複製代碼

讀取ui

127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 3
1) "b"
127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 5
1) "b"
2) "a"
複製代碼

也能夠把score打出來spa

127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf 5 WITHSCORES
1) "b"
2) "1"
3) "a"
4) "5"
複製代碼

查出全部的數據code

127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf inf
1) "b"
2) "a"
3) "d"
4) "c"
複製代碼

刪除數據orm

ZREMRANGEBYSCORE testSet1 0 2
複製代碼

延時隊列的實現思路

整體的思路很簡單,就是每個valuescore保存的是時間,也就是說,在添加一個元素時他的score是當前時間+延時的時間。輪循獲取數據時,查找小於或等於當前時間的數據項,就是具體的延時消息。

還有一個問題,就是ZRANGEBYSCORElistpop不一樣,pop是取出元素而且會把元素在list中刪除。ZRANGEBYSCORE只會取出數據不會把數據從sorted set中刪除。解決方法1,利用redis事務,先ZRANGEBYSCORE取出數據,而後再用ZREMRANGEBYSCORE 把數據刪除。

具體實現-code

添加延時消息,參數delay就是咱們要延時多久:

func (p *Producer) PublishDelayMsg(topicName string, body []byte, delay time.Duration) error {
	if delay <= 0 {
		return errors.New("delay need great than zero")
	}
	tm := time.Now().Add(delay)
	msg := NewMessage("", body)
	msg.DelayTime = tm.Unix()

	sendData, _ := json.Marshal(msg)
	return p.redisCmd.ZAdd(topicName+zsetSuffix, redis.Z{Score: float64(tm.Unix()), Member: string(sendData)}).Err()
}
複製代碼

使用,好比咱們想過1秒再處理

producer.PublishDelayMsg(topicName, body, time.Second)
複製代碼

讀取消息並處理 這就比較簡單了,就是在一個ticker裏循環讀取小於或等於當前時間的數據:

func (s *consumer) startGetDelayMessage() {
	go func() {
		ticker := time.NewTicker(s.options.RateLimitPeriod)
		defer func() {
			log.Println("stop get delay message.")
			ticker.Stop()
		}()
		topicName := s.topicName + zsetSuffix
		for {
			currentTime := time.Now().Unix()
			select {
			case <-s.ctx.Done():
				log.Printf("context Done msg: %#v \n", s.ctx.Err())
				return
			case <-ticker.C:
				var valuesCmd *redis.ZSliceCmd
				_, err := s.redisCmd.TxPipelined(func(pip redis.Pipeliner) error {
					valuesCmd = pip.ZRangeWithScores(topicName, 0, currentTime)
					pip.ZRemRangeByScore(topicName, "0", strconv.FormatInt(currentTime, 10))
					return nil
				})
				if err != nil {
					log.Printf("zset pip error: %#v \n", err)
					continue
				}
				rev := valuesCmd.Val()
				for _, revBody := range rev {
					msg := &Message{}
					json.Unmarshal([]byte(revBody.Member.(string)), msg)
					if s.handler != nil {
						s.handler.HandleMessage(msg)
					}
				}
			}
		}
	}()
}
複製代碼
相關文章
相關標籤/搜索