Redis 實戰 —— 09. 實現任務隊列、消息拉取和文件分發

任務隊列 P133

經過將待執行任務的相關信息放入隊列裏面,並在以後對隊列進行處理,能夠推遲執行那些耗時對操做,這種將工做交給任務處理器來執行對作法被稱爲任務隊列 (task queue) 。 P133git

先進先出隊列 P133

能夠 Redis 的列表結構存儲任務的相關信息,並使用 RPUSH 將待執行任務的相關信息推入列表右端,使用阻塞版本的彈出命令 BLPOP 從隊列中彈出待執行任務的相關信息(由於任務處理器除了執行任務不須要執行其餘工做)。 P134github

發送任務redis

// 將任務參數推入指定任務對應的列表右端
func SendTask(conn redis.Conn, queueName string, param string) (bool, error) {
	count, err := redis.Int(conn.Do("RPUSH", queueName, param))
	if err != nil {
		return false, nil
	}
	// 只有成功推入 1 個纔算成功發送
	return count == 1, nil
}

執行任務json

// 不斷從任務對應的列表中獲取任務參數,並執行任務
func RunTask(conn redis.Conn, queueName string, taskHandler func(param string)) {
	for ; ; {
		result, err := redis.Strings(conn.Do("BLPOP", queueName, 10))
		// 若是成功獲取任務信息,則執行任務
		if err != nil && len(result) == 2 {
			taskHandler(result[1])
		}
	}
}

以上代碼是任務隊列與 Redis 交互的通用版本,使用方式簡單,只須要將入參信息序列化成字符串傳入便可發送一個任務,提供一個處理任務的方法回調便可執行任務。數組

任務優先級 P136

在此基礎上能夠講原有的先進先出任務隊列改成具備優先級的任務隊列,即高優先級的任務須要在低優先級的任務以前執行。 BLPOP 將彈出第一個非空列表的第一個元素,因此咱們只須要將全部任務隊列名數組按照優先級降序排序,讓任務隊列名數組做爲 BLPOP 的入參便可實現上述功能(固然這種若是高優先級任務的生成速率大於消費速率,那麼低優先級的任務就永遠不會執行)。 P136網絡

優先執行高優先級任務數據結構

// 不斷從任務對應的列表中獲取任務參數,並執行任務
// queueNames 從前日後的優先級依次下降
func RunTasks(conn redis.Conn, queueNames []string, queueNameToTaskHandler map[string]func(param string)) {
	// 校驗是否全部任務都有對應的處理方法
	for _, queueName := range queueNames {
		if _, exists := queueNameToTaskHandler[queueName]; !exists {
			panic(fmt.Sprintf("queueName(%v) not in queueNameToTaskHandler", queueName))
		}
	}
	// 將全部入參放入同一個數組
	length := len(queueNames)
	args := make([]interface{}, length + 1)
	for i := 0; i < length; i++ {
		args[i] = queueNames[i]
	}
	args[length] = 10
	for ; ; {
		result, err := redis.Strings(conn.Do("BLPOP", args...))
		// 若是成功獲取任務信息,則執行任務
		if err != nil && len(result) == 2 {
			// 找到對應的處理方法並執行
			taskHandler := queueNameToTaskHandler[result[0]]
			taskHandler(result[1])
		}
	}
}
延遲任務 P136

實際業務場景中還存在某些任務須要在指定時間進行操做,例如:郵件定時發送等。此時還須要存儲任務執行的時間,並將能夠執行的任務放入剛剛的任務隊列中。可使用有序集合進行存儲,時間戳做爲分值,任務相關信息及隊列名等信息的 json 串做爲鍵。ide

發送延遲任務函數

// 存儲延遲任務的相關信息,用於序列化和反序列化
type delayedTaskInfo struct {
	UnixNano  int64  `json:"unixNano"`
	QueueName string `json:"queueName"`
	Param     string `json:"param"`
}
// 發送一個延遲任務
func SendDelayedTask(conn redis.Conn, queueName string, param string, executeAt time.Time) (bool, error) {
	// 若是已到執行時間,則直接發送到任務隊列
	if executeAt.UnixNano() <= time.Now().UnixNano() {
		return SendTask(conn, queueName, param)
	}
	// 還未到執行時間,須要放入有序集合
	// 序列化相關信息
	infoJson, err := json.Marshal(delayedTaskInfo{
		UnixNano: time.Now().UnixNano(),
		QueueName:queueName,
		Param:param,
	})
	if err != nil {
		return false, err
	}
	// 放入有序集合
	count, err := redis.Int(conn.Do("ZADD", "delayed_tasks", infoJson, executeAt.UnixNano()))
	if err != nil {
		return false, err
	}
	// 只有成功加入 1 個纔算成功
	return count == 1, nil
}

拉取可執行的延遲任務,放入任務隊列idea

// 輪詢延遲任務,將可執行的任務放入任務隊列
func PollDelayedTask(conn redis.Conn) {
	for ; ; {
		// 獲取最先須要執行的任務
		infoMap, err := redis.StringMap(conn.Do("ZRANGE", "delayed_tasks", 0, 0, "WITHSCORES"))
		if err != nil || len(infoMap) != 1 {
			// 睡 1ms 再繼續
			time.Sleep(time.Millisecond)
			continue
		}
		for infoJson, unixNano := range infoMap {
			// 已到時間,放入任務隊列
			executeAt, err := strconv.Atoi(unixNano)
			if err != nil {
				log.Errorf("#PollDelayedTask -> convert unixNano to int error, infoJson: %v, unixNano: %v", infoJson, unixNano)
				// 作一些後續處理,例如:刪除該條信息,防止耽誤其餘延遲任務
			}
			if int64(executeAt) <= time.Now().UnixNano() {
				// 反序列化
				info := new(delayedTaskInfo)
				err := json.Unmarshal([]byte(infoJson), info)
				if err != nil {
					log.Errorf("#PollDelayedTask -> infoJson unmarshal error, infoJson: %v, unixNano: %v", infoJson, unixNano)
					// 作一些後續處理,例如:刪除該條信息,防止耽誤其餘延遲任務
				}
				// 從有序集合刪除該信息,並放入任務隊列
				count, err := redis.Int(conn.Do("ZREM", "delayed_tasks", infoJson))
				if err != nil && count == 1 {
					_, _ = SendTask(conn, info.QueueName, info.Param)
				}
			} else {
				// 未到時間,睡 1ms 再繼續
				time.Sleep(time.Millisecond)
			}
		}
	}
}

有序集合不具有列表的阻塞彈出機制,因此程序須要不斷循環,並嘗試從隊列中獲取要被執行的任務,這一操做會增大網絡和處理器的負載。能夠經過在函數裏面增長一個自適應方法 (adaptive method) ,讓函數在一段時間內都沒有發現可執行的任務時,自動延長休眠時間,或者根據下一個任務的執行時間來決定休眠的時長,並將休眠時長的最大值限制爲 100ms ,從而確保任務能夠被及時執行。 P138

消息拉取 P139

兩個或多個客戶端在互相發送和接收消息的時候,一般會使用如下兩種方法來傳遞信息: P139

  • 消息推送 (push messaging) :即由發送者來確保全部接受者已經成功接收到了消息。 Redis 內置了用於進行消息推送的 PUBLISH 命令和 SUBSCRIBE 命令(05. Redis 其餘命令簡介 介紹了這兩個命令的用法和缺陷)
  • 消息拉取 (pull messaging) :即由接受者本身去獲取存儲的信息
單個接受者 P140

單個接受者時,只須要將發送的信息保存至每一個接收者對應的列表中便可,使用 RPUSH 能夠向執行接受者發送消息,使用 LTRIM 能夠移除列表中的前幾個元素來獲取收到的消息。 P140

多個接受者 P141

多個接受者的狀況相似羣組,即羣組內的人發消息,其餘人均可以收到。咱們可使用如下幾個數據結構存儲所需數據,以便實現咱們的所需的功能:

  • STRING: 羣組的消息自增 id
    • INCR: 實現 id 自增並獲取
  • ZSET: 存儲該羣組中的每一條信息,分值爲當前羣組內的消息自增 id
    • ZRANGEBYSCORE: 得到未獲取的消息
  • ZSET: 存儲該羣組中每一個人得到的最新一條消息的 id ,全部消息均未獲取時爲 0
    • ZCARD: 獲取羣組人數
    • ZRANGE: 通過處理後,可實現哪些消息成功被哪些人接收了的功能
    • ZRANGE: 獲取 id 最小數據,可實現刪除被全部人獲取過的消息的功能
  • ZSET: 存儲一我的全部羣組得到的最新一條消息的 id ,離開羣組時自動刪除,加入羣組時初始化爲 0
    • ZCARD: 獲取所在的羣組個數
    • ZRANGE: 通過處理後,可實現批量拉取全部羣組的未獲取的消息的功能

文件分發 P145

根據地理位置聚合用戶數據 P146

如今擁有每一個 ip 天天進行活動的時間和具體操做,現須要計算天天每一個城市的人操做數量(相似於統計日活)。

原始數據十分巨大,因此須要分批讀入內存進行聚合統計,而聚合後的數據相對來講很小,因此徹底能夠在內存中進行聚合統計,完成後再將結果寫入 Redis 中,能夠有效減小程序與 Redis 服務的通訊次數,縮短任務時間。

日誌分發及處理

如今有一臺機器的本地日誌須要交給多個日誌處理器進行不一樣的分析。

這種場景相似羣組,因此咱們能夠複用上面提到的支持多個接受者的消息拉取組件。

本地機器:

  1. 將全部日誌發送至羣組,最後再發送一條結束消息
  2. 等待全部日誌處理器處理完(羣組對應的完成標識 = 羣組內的成員數 - 1)
  3. 清理本次發送的全部日誌

日誌處理器:

  1. 不斷從羣組中拉取消息,並進入相關處理,直至拉取到結束消息
  2. 對羣組對應的完成標識進行 INCR ,表示當前日誌處理器已完成處理

本文首發於公衆號:滿賦諸機(點擊查看原文) 開源在 GitHub :reading-notes/redis-in-action

相關文章
相關標籤/搜索