P133
經過將待執行任務的相關信息放入隊列裏面,並在以後對隊列進行處理,能夠推遲執行那些耗時對操做,這種將工做交給任務處理器來執行對作法被稱爲任務隊列 (task queue) 。 P133
git
P133
能夠 Redis
的列表結構存儲任務的相關信息,並使用 RPUSH
將待執行任務的相關信息推入列表右端,使用阻塞版本的彈出命令 BLPOP
從隊列中彈出待執行任務的相關信息(由於任務處理器除了執行任務不須要執行其餘工做)。 P134
github
發送任務redis
// 將任務參數推入指定任務對應的列表右端 func SendTask(conn redis.Conn, queueName string, param string) (bool, error) { count, err := redis.Int(conn.Do("RPUSH", queueName, param)) if err != nil { return false, nil } // 只有成功推入 1 個纔算成功發送 return count == 1, nil }
執行任務json
// 不斷從任務對應的列表中獲取任務參數,並執行任務 func RunTask(conn redis.Conn, queueName string, taskHandler func(param string)) { for ; ; { result, err := redis.Strings(conn.Do("BLPOP", queueName, 10)) // 若是成功獲取任務信息,則執行任務 if err != nil && len(result) == 2 { taskHandler(result[1]) } } }
以上代碼是任務隊列與 Redis
交互的通用版本,使用方式簡單,只須要將入參信息序列化成字符串傳入便可發送一個任務,提供一個處理任務的方法回調便可執行任務。數組
P136
在此基礎上能夠講原有的先進先出任務隊列改成具備優先級的任務隊列,即高優先級的任務須要在低優先級的任務以前執行。 BLPOP
將彈出第一個非空列表的第一個元素,因此咱們只須要將全部任務隊列名數組按照優先級降序排序,讓任務隊列名數組做爲 BLPOP
的入參便可實現上述功能(固然這種若是高優先級任務的生成速率大於消費速率,那麼低優先級的任務就永遠不會執行)。 P136
網絡
優先執行高優先級任務數據結構
// 不斷從任務對應的列表中獲取任務參數,並執行任務 // queueNames 從前日後的優先級依次下降 func RunTasks(conn redis.Conn, queueNames []string, queueNameToTaskHandler map[string]func(param string)) { // 校驗是否全部任務都有對應的處理方法 for _, queueName := range queueNames { if _, exists := queueNameToTaskHandler[queueName]; !exists { panic(fmt.Sprintf("queueName(%v) not in queueNameToTaskHandler", queueName)) } } // 將全部入參放入同一個數組 length := len(queueNames) args := make([]interface{}, length + 1) for i := 0; i < length; i++ { args[i] = queueNames[i] } args[length] = 10 for ; ; { result, err := redis.Strings(conn.Do("BLPOP", args...)) // 若是成功獲取任務信息,則執行任務 if err != nil && len(result) == 2 { // 找到對應的處理方法並執行 taskHandler := queueNameToTaskHandler[result[0]] taskHandler(result[1]) } } }
P136
實際業務場景中還存在某些任務須要在指定時間進行操做,例如:郵件定時發送等。此時還須要存儲任務執行的時間,並將能夠執行的任務放入剛剛的任務隊列中。可使用有序集合進行存儲,時間戳做爲分值,任務相關信息及隊列名等信息的 json
串做爲鍵。ide
發送延遲任務函數
// 存儲延遲任務的相關信息,用於序列化和反序列化 type delayedTaskInfo struct { UnixNano int64 `json:"unixNano"` QueueName string `json:"queueName"` Param string `json:"param"` } // 發送一個延遲任務 func SendDelayedTask(conn redis.Conn, queueName string, param string, executeAt time.Time) (bool, error) { // 若是已到執行時間,則直接發送到任務隊列 if executeAt.UnixNano() <= time.Now().UnixNano() { return SendTask(conn, queueName, param) } // 還未到執行時間,須要放入有序集合 // 序列化相關信息 infoJson, err := json.Marshal(delayedTaskInfo{ UnixNano: time.Now().UnixNano(), QueueName:queueName, Param:param, }) if err != nil { return false, err } // 放入有序集合 count, err := redis.Int(conn.Do("ZADD", "delayed_tasks", infoJson, executeAt.UnixNano())) if err != nil { return false, err } // 只有成功加入 1 個纔算成功 return count == 1, nil }
拉取可執行的延遲任務,放入任務隊列idea
// 輪詢延遲任務,將可執行的任務放入任務隊列 func PollDelayedTask(conn redis.Conn) { for ; ; { // 獲取最先須要執行的任務 infoMap, err := redis.StringMap(conn.Do("ZRANGE", "delayed_tasks", 0, 0, "WITHSCORES")) if err != nil || len(infoMap) != 1 { // 睡 1ms 再繼續 time.Sleep(time.Millisecond) continue } for infoJson, unixNano := range infoMap { // 已到時間,放入任務隊列 executeAt, err := strconv.Atoi(unixNano) if err != nil { log.Errorf("#PollDelayedTask -> convert unixNano to int error, infoJson: %v, unixNano: %v", infoJson, unixNano) // 作一些後續處理,例如:刪除該條信息,防止耽誤其餘延遲任務 } if int64(executeAt) <= time.Now().UnixNano() { // 反序列化 info := new(delayedTaskInfo) err := json.Unmarshal([]byte(infoJson), info) if err != nil { log.Errorf("#PollDelayedTask -> infoJson unmarshal error, infoJson: %v, unixNano: %v", infoJson, unixNano) // 作一些後續處理,例如:刪除該條信息,防止耽誤其餘延遲任務 } // 從有序集合刪除該信息,並放入任務隊列 count, err := redis.Int(conn.Do("ZREM", "delayed_tasks", infoJson)) if err != nil && count == 1 { _, _ = SendTask(conn, info.QueueName, info.Param) } } else { // 未到時間,睡 1ms 再繼續 time.Sleep(time.Millisecond) } } } }
有序集合不具有列表的阻塞彈出機制,因此程序須要不斷循環,並嘗試從隊列中獲取要被執行的任務,這一操做會增大網絡和處理器的負載。能夠經過在函數裏面增長一個自適應方法 (adaptive method) ,讓函數在一段時間內都沒有發現可執行的任務時,自動延長休眠時間,或者根據下一個任務的執行時間來決定休眠的時長,並將休眠時長的最大值限制爲 100ms ,從而確保任務能夠被及時執行。 P138
P139
兩個或多個客戶端在互相發送和接收消息的時候,一般會使用如下兩種方法來傳遞信息: P139
Redis
內置了用於進行消息推送的 PUBLISH
命令和 SUBSCRIBE
命令(05. Redis 其餘命令簡介 介紹了這兩個命令的用法和缺陷)P140
單個接受者時,只須要將發送的信息保存至每一個接收者對應的列表中便可,使用 RPUSH
能夠向執行接受者發送消息,使用 LTRIM
能夠移除列表中的前幾個元素來獲取收到的消息。 P140
P141
多個接受者的狀況相似羣組,即羣組內的人發消息,其餘人均可以收到。咱們可使用如下幾個數據結構存儲所需數據,以便實現咱們的所需的功能:
INCR
: 實現 id 自增並獲取ZRANGEBYSCORE
: 得到未獲取的消息ZCARD
: 獲取羣組人數ZRANGE
: 通過處理後,可實現哪些消息成功被哪些人接收了的功能ZRANGE
: 獲取 id 最小數據,可實現刪除被全部人獲取過的消息的功能ZCARD
: 獲取所在的羣組個數ZRANGE
: 通過處理後,可實現批量拉取全部羣組的未獲取的消息的功能P145
P146
如今擁有每一個 ip 天天進行活動的時間和具體操做,現須要計算天天每一個城市的人操做數量(相似於統計日活)。
原始數據十分巨大,因此須要分批讀入內存進行聚合統計,而聚合後的數據相對來講很小,因此徹底能夠在內存中進行聚合統計,完成後再將結果寫入 Redis
中,能夠有效減小程序與 Redis
服務的通訊次數,縮短任務時間。
如今有一臺機器的本地日誌須要交給多個日誌處理器進行不一樣的分析。
這種場景相似羣組,因此咱們能夠複用上面提到的支持多個接受者的消息拉取組件。
本地機器:
日誌處理器:
INCR
,表示當前日誌處理器已完成處理本文首發於公衆號:滿賦諸機(點擊查看原文) 開源在 GitHub :reading-notes/redis-in-action