玩轉redis-簡單消息隊列

使用go語言基於redis寫了一個簡單的消息隊列
源碼地址
使用demogit

redis的 list 很是的靈活,能夠從左邊或者右邊添加元素,固然也以從任意一頭讀取數據github

添加數據和獲取數據的操做也是很是簡單的 LPUSH 從左邊插入數據 RPUSH 大右邊插入數據 LPOP 從左邊取出一個數據 RPOP 從右邊取出一個數據redis

127.0.0.1:6379> LPUSH list1 a
(integer) 1
127.0.0.1:6379> RPUSH list1 b
(integer) 2
127.0.0.1:6379> LPOP list1
"a"
127.0.0.1:6379> RPOP list1
"b"
複製代碼

或者使用 BLPOP BRPOP 來讀取數據,不一樣之處是取數據時,若是沒有數據會等待指定的時間, 若是這期間有數據寫入,則會讀取並返回,沒有數據則會返回空 在一個窗口1讀取json

127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"
複製代碼

在另外一個窗口2寫入bash

127.0.0.1:6379> RPUSH list1 a b c
(integer) 3
複製代碼

再開一個窗口3讀取,第二次讀取時,list是空的,因此等待1秒後返回空。併發

127.0.0.1:6379> BRPOP list1 1
1) "list1"
2) "c"

127.0.0.1:6379> BRPOP list1 1
(nil)
(1.04s)
複製代碼

簡單消息隊列的實現

若是咱們只從一邊新增元素,向另外一邊取出元素,這就不是一個消息隊列麼。但我估計你會有一個疑問,在消費數據時,同一個消息會不會同時被多個consumer消費掉? ui

固然不會,由於redis是單線程的,在從list取數據時自然不會出現併發問題。可是這是一個簡單的消息隊列,消費不成功怎麼處理仍是須要咱們本身寫代碼來實現的spa

下面我說一下使用list實現一個簡單的消息隊列的總體思路線程

comsumer的實現

consumer 主要作的就是從list裏讀取數據,使用LPOP或者BLPOP均可以, 這裏作了一個開關 optionsUseBLopp若是爲true時會使用BLPOPcode

type consumer struct {
	once            sync.Once
	redisCmd        redis.Cmdable
	ctx             context.Context
	topicName       string
	handler         Handler
	rateLimitPeriod time.Duration
	options         ConsumerOptions
	_               struct{}
}

type ConsumerOptions struct {
	RateLimitPeriod time.Duration
	UseBLPop        bool
}

複製代碼

看一下建立consumer的代碼,最後面的opts參數是可選的配置

type Consumer = *consumer

func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
	consumer := &consumer{
		redisCmd:  redisCmd,
		ctx:       ctx,
		topicName: topicName,
	}
	for _, o := range opts {
		o(&consumer.options)
	}
	if consumer.options.RateLimitPeriod == 0 {
		consumer.options.RateLimitPeriod = time.Microsecond * 200
	}
	return consumer
}

複製代碼

讀取數據後具體怎麼進行處理調用者能夠根據本身的業務邏輯進行相應處理 有一個小的interface調用者根據本身的邏輯去實現

type Handler interface {
	HandleMessage(msg *Message)
}
複製代碼

讀取數據的邏輯使用一個gorouting實現

func (s *consumer) startGetMessage() {
	go func() {
		ticker := time.NewTicker(s.options.RateLimitPeriod)
		defer func() {
			log.Println("stop get message.")
			ticker.Stop()
		}()
		for {
			select {
			case <-s.ctx.Done():
				log.Printf("context Done msg: %#v \n", s.ctx.Err())
				return
			case <-ticker.C:
				var revBody []byte
				var err error
				if !s.options.UseBLPop {
					revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
				} else {
					revs := s.redisCmd.BLPop(time.Second, s.topicName)
					err = revs.Err()
					revValues := revs.Val()
					if len(revValues) >= 2 {
						revBody = []byte(revValues[1])
					}
				}
				if err == redis.Nil {
					continue
				}
				if err != nil {
					log.Printf("LPOP error: %#v \n", err)
					continue
				}

				if len(revBody) == 0 {
					continue
				}
				msg := &Message{}
				json.Unmarshal(revBody, msg)
				if s.handler != nil {
					s.handler.HandleMessage(msg)
				}
			}
		}
	}()
}

複製代碼

Producer 的實現

Producer仍是很簡單的就是把數據推送到 reids

type Producer struct {
	redisCmd redis.Cmdable
	_        struct{}
}

func NewProducer(cmd redis.Cmdable) *Producer {
	return &Producer{redisCmd: cmd}
}

func (p *Producer) Publish(topicName string, body []byte) error {
	msg := NewMessage("", body)
	sendData, _ := json.Marshal(msg)
	return p.redisCmd.RPush(topicName, string(sendData)).Err()
}
複製代碼
相關文章
相關標籤/搜索