golang實現redis的延時消息隊列功能

前言

在學習過程當中發現redis的zset還能夠用來實現輕量級的延時消息隊列功能,雖然可靠性還有待提升,可是對於一些對數據可靠性要求不那麼高的功能要求徹底能夠實現。本次主要採用了redis中zset中的zadd, zrangebyscore 和 zdel來實現一個小demo。git

提早準備 安裝redis, redis-go

由於用的是macOS, 直接github

$ brew install redis
$ go get github.com/garyburd/redigo/redis
複製代碼

又由於比較懶,生成任務的惟一id時,直接採用了bson中的objectId,因此:redis

$ go get gopkg.in/mgo.v2/bson
複製代碼

惟一id不是必須有,但若是以後有實際應用須要攜帶,便於查找相應任務。shell

生產者

經過一個for循環生成10w個任務, 每個任務有不一樣的時間json

func producer() {
	count := 0
	//生成100000個任務
	for count < 100000 {
		count++
		dealTime := int64(rand.Intn(5)) + time.Now().Unix()
		uuid := bson.NewObjectId().Hex()
		redis.Client.AddJob(&job.JobMessage{
			Id: uuid,
			DealTime: dealTime,
		},  + int64(dealTime))
	}
}
複製代碼

其中AddJob函數在另外一個包中, 將上一個函數中隨機生成的時間做爲須要處理的時間戳.bash

// 添加任務
func (client *RedisClient) AddJob(msg *job.JobMessage, dealTime int64) {
	conn := client.Get()
	defer conn.Close()

	key := "JOB_MESSAGE_QUEUE"
	conn.Do("zadd", key, dealTime, util.JsonEncode(msg))
}
複製代碼

消費者

消費者處理流程分爲兩個步驟:函數

  1. 獲取小於等於當前時間戳的任務
  2. 經過刪除當前任務來判斷誰得到了當前任務

由於在獲取小於等於當前時間戳的任務時,可能有多個go routine同時讀到了當前任務,而只有一個任務能夠來處理當前任務。所以咱們須要經過一個方案來判斷究竟由誰來處理這個任務(固然若是隻有一個消費者能夠讀到就直接處理):這個時候能夠經過redis的刪除操做來獲取,由於刪除指定value時只有成功的操做纔會返回不爲0,因此咱們能夠認爲刪除當前隊列成功的那個go routine拿到了當前的任務。學習

下面是代碼:ui

// 消費者
func consumer() {
	// 啓動10個go routine一塊兒去拿
	count := 0
	for count < 10 {
		go func() {
			for {
				jobs := redis.Client.GetJob()
				if len(jobs) <= 0 {
					time.Sleep(time.Second * 1)
					continue
				}
				currentJob := jobs[0]
				// 若是當前搶redis隊列成功,
				if redis.Client.DelJob(currentJob) > 0 {
					var jobMessage job.JobMessage
					util.JsonDecode(currentJob, &jobMessage) //自定義的json解析函數
					handleMessage(&jobMessage)
				}

			}

		}()
		count++
	}
}

// 處理任務用函數
func handleMessage(msg *job.JobMessage) {
	fmt.Printf("deal job: %s, require time: %d \n", msg.Id, msg.DealTime)
	go func() {
		countChan <- true
	}()
}
複製代碼

redis部分的代碼,獲取任務和刪除任務spa

// 獲取任務
func (client *RedisClient) GetJob() []string {
	conn := client.Get()
	defer conn.Close()

	key := "JOB_MESSAGE_QUEUE"
	timeNow := time.Now().Unix()
	ret, err := redis.Strings(conn.Do("zrangebyscore", key, 0, timeNow, "limit", 0, 1))
	if err != nil {
		panic(err)
	}
	return ret
}

// 刪除當前任務, 用來判斷是否搶到了當前任務
func (client *RedisClient) DelJob(value string) int {
	conn := client.Get()
	defer conn.Close()

	key := "JOB_MESSAGE_QUEUE"
	ret, err := redis.Int(conn.Do("zrem", key, value))
	if err != nil {
		panic(err)
	}
	return ret
}
複製代碼

代碼大抵如此。最後跑起來以後,大概每3-4秒鐘可以處理掉1w個任務,速度上確實是...

相關文章
相關標籤/搜索