使用go
語言基於redis
寫了一個簡單的消息隊列
源碼地址
使用demogit
redis的 list
很是的靈活,能夠從左邊或者右邊添加元素,固然也以從任意一頭讀取數據github
添加數據和獲取數據的操做也是很是簡單的 LPUSH
從左邊插入數據 RPUSH
大右邊插入數據 LPOP
從左邊取出一個數據 RPOP
從右邊取出一個數據redis
127.0.0.1:6379> LPUSH list1 a
(integer) 1
127.0.0.1:6379> RPUSH list1 b
(integer) 2
127.0.0.1:6379> LPOP list1
"a"
127.0.0.1:6379> RPOP list1
"b"
複製代碼
或者使用 BLPOP
BRPOP
來讀取數據,不一樣之處是取數據時,若是沒有數據會等待指定的時間, 若是這期間有數據寫入,則會讀取並返回,沒有數據則會返回空 在一個窗口1
讀取json
127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"
複製代碼
在另外一個窗口2
寫入bash
127.0.0.1:6379> RPUSH list1 a b c
(integer) 3
複製代碼
再開一個窗口3
讀取,第二次讀取時,list
是空的,因此等待1秒後返回空。併發
127.0.0.1:6379> BRPOP list1 1
1) "list1"
2) "c"
127.0.0.1:6379> BRPOP list1 1
(nil)
(1.04s)
複製代碼
若是咱們只從一邊新增元素,向另外一邊取出元素,這就不是一個消息隊列麼。但我估計你會有一個疑問,在消費數據時,同一個消息會不會同時被多個consumer
消費掉? ui
固然不會,由於redis是單線程的,在從list
取數據時自然不會出現併發問題。可是這是一個簡單的消息隊列,消費不成功怎麼處理仍是須要咱們本身寫代碼來實現的spa
下面我說一下使用list
實現一個簡單的消息隊列的總體思路線程
consumer
主要作的就是從list裏讀取數據,使用LPOP
或者BLPOP
均可以, 這裏作了一個開關 options
的UseBLopp
若是爲true
時會使用BLPOP
。code
type consumer struct {
once sync.Once
redisCmd redis.Cmdable
ctx context.Context
topicName string
handler Handler
rateLimitPeriod time.Duration
options ConsumerOptions
_ struct{}
}
type ConsumerOptions struct {
RateLimitPeriod time.Duration
UseBLPop bool
}
複製代碼
看一下建立consumer
的代碼,最後面的opts
參數是可選的配置
type Consumer = *consumer
func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
consumer := &consumer{
redisCmd: redisCmd,
ctx: ctx,
topicName: topicName,
}
for _, o := range opts {
o(&consumer.options)
}
if consumer.options.RateLimitPeriod == 0 {
consumer.options.RateLimitPeriod = time.Microsecond * 200
}
return consumer
}
複製代碼
讀取數據後具體怎麼進行處理調用者能夠根據本身的業務邏輯進行相應處理 有一個小的interface
調用者根據本身的邏輯去實現
type Handler interface {
HandleMessage(msg *Message)
}
複製代碼
讀取數據的邏輯使用一個gorouting實現
func (s *consumer) startGetMessage() {
go func() {
ticker := time.NewTicker(s.options.RateLimitPeriod)
defer func() {
log.Println("stop get message.")
ticker.Stop()
}()
for {
select {
case <-s.ctx.Done():
log.Printf("context Done msg: %#v \n", s.ctx.Err())
return
case <-ticker.C:
var revBody []byte
var err error
if !s.options.UseBLPop {
revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
} else {
revs := s.redisCmd.BLPop(time.Second, s.topicName)
err = revs.Err()
revValues := revs.Val()
if len(revValues) >= 2 {
revBody = []byte(revValues[1])
}
}
if err == redis.Nil {
continue
}
if err != nil {
log.Printf("LPOP error: %#v \n", err)
continue
}
if len(revBody) == 0 {
continue
}
msg := &Message{}
json.Unmarshal(revBody, msg)
if s.handler != nil {
s.handler.HandleMessage(msg)
}
}
}
}()
}
複製代碼
Producer
仍是很簡單的就是把數據推送到 reids
type Producer struct {
redisCmd redis.Cmdable
_ struct{}
}
func NewProducer(cmd redis.Cmdable) *Producer {
return &Producer{redisCmd: cmd}
}
func (p *Producer) Publish(topicName string, body []byte) error {
msg := NewMessage("", body)
sendData, _ := json.Marshal(msg)
return p.redisCmd.RPush(topicName, string(sendData)).Err()
}
複製代碼