基於golang和redis實現輕量級隊列

Github: 基於golang和redis實現輕量級隊列

1. 概述

gmq是基於redis提供的特性,使用go語言開發的一個簡單易用的隊列;關於redis使用特性能夠參考以前本人寫過一篇很簡陋的文章Redis 實現隊列; gmq的靈感和設計是基於有贊延遲隊列設計,文章內容清晰並且很好理解,可是沒有提供源碼,在文章的最後也提到了一些將來架構方向; gmq不是簡單按照有贊延遲隊列的設計實現功能,在它的基礎上,作了一些修改和優化,主要以下:php

  • 功能上
    • 多種任務模式,不僅僅只是延遲隊列;例如:延遲隊列,普通隊列,優先級隊列
  • 架構上:
    • 添加job由dispatcher調度分配各個bucket,而不是由timer
    • 每一個bucket維護一個timer,而不是全部bucket一個timer
    • timer每次掃描bucket到期job時,會一次性返回多個到期job,而不是每次只返回一個job
    • timer的掃描時鐘由bucket中下個job到期時間決定,而不是每秒掃描一次

2. 應用場景

  • 延遲任務
    • 延遲任務,例如用戶下訂單一直處於未支付狀態,半個小時候自動關閉訂單
  • 異步任務
    • 異步任務,通常用於耗時操做,例如羣發郵件等批量操做
  • 超時任務
    • 規定時間內(TTR)沒有執行完畢或程序被意外中斷,則消息從新回到隊列再次被消費,通常用於數據比較敏感,不容丟失的
  • 優先級任務
    • 當多個任務同時產生時,按照任務設定等級優先被消費,例如a,b兩種類型的job,優秀消費a,而後再消費b

3. gmq原理

gmq流程圖

3.1 核心概念

  • dispatcher任務調度器,負責將job分配到bucket或直接推送到ready queue
  • bucket任務桶,用於存放延遲任務;每一個bucket會維護一個timer定時器,而後將到期的job推送到ready queue
  • ready queue存放已準備好的job,等待被consumer消費

3.2 延遲時間delay

  • 當job.delay>0時,job會被分配到bucket中,bucket會有周期性掃描到期job,若是到期,job會被bucket移到ready queue,等待被消費
  • 當job.delay=0時,job會直接加到ready queue,等待被消費

3.3 執行超時時間TTR

參考第一個圖的流程,當job被消費者讀取後,若是job.TTR>0,即job設置了執行超時時間,那麼job會在讀取後會被添加到TTRBucket(專門存放設置了超時時間的job),而且設置job.delay = job.TTR,若是在TTR時間內沒有獲得消費者ack確認而後刪除job,job將在TTR時間以後添加到ready queue,而後再次被消費(若是消費者在TTR時間以後才請求ack,會獲得失敗的響應)python

3.3 確認機制

主要和TTR的設置有關係,確認機制能夠分爲兩種:git

  • 當job.TTR=0時,消費者pop出job時,即會自動刪除job pool中的job元數據
  • 當job.TTR>0時,即job執行超時時間,這個時間是指用戶pop出job時開始到用戶ack確認刪除結束這段時間,若是在這段時間沒有ACK,job會被再次加入到ready queue,而後再次被消費,只有用戶調用了ACK,纔會去刪除job pool中job元數據

4. 安裝

4.1 源碼運行

配置文件位於gmq/conf.ini,能夠根據本身項目需求修改配置github

git clone https://github.com/wuzhc/gmq.git
cd gmq
go get -u -v github.com/kardianos/govendor # 若是有就不須要安裝了
govendor sync
go run main.go
# go build # 可編譯成可執行文件
複製代碼

3.2 執行文件運行

# 啓動
./gmq start
# 中止
./gmq stop

# 守護進程模式啓動,不輸出日誌到console
nohup ./gmq start >/dev/null 2>&1  &
# 守護進程模式下查看日誌輸出(配置文件conf.ini須要設置target_type=file,filename=gmq.log)
tail -f gmq.log
複製代碼

5. 客戶端

目前只實現python,go,php語言的客戶端的demo,參考:github.com/wuzhc/demo/…golang

運行

# php
# 生產者
php producer.php
# 消費者
php consumer.php

# python
# 生產者
python producer.py
# 消費者
python consumer.py
複製代碼

一條消息結構

{
    "id": "xxxx",	 # 任務id,這個必須是一個惟一值,將做爲redis的緩存鍵
    "topic": "xxx",  # topic是一組job的分類名,消費者將訂閱topic來消費該分類下的job
    "body": "xxx",   # 消息內容
    "delay": "111",  # 延遲時間,單位秒
    "TTR": "11111",  # 執行超時時間,單位秒
    "status": 1,     # job執行狀態,該字段由gmq生成
    "consumeNum":1,  # 被消費的次數,主要記錄TTR>0時,被重複消費的次數,該字段由gmq生成
}
複製代碼

延遲任務

$data = [
        'id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_xxx"],
        'body'  => 'this is a rpc test',
        'delay' => '1800', // 單位秒,半個小時後執行
        'TTR'   => '0'
    ];
複製代碼

超時任務

$data = [
        'id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_xxx"],
        'body'  => 'this is a rpc test',
        'delay' => '0', 
        'TTR'   => '100' // 100秒後還未獲得消費者ack確認,則再次添加到隊列,將再次被被消費
    ];
複製代碼

異步任務

$data = [
        'id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_xxx"],
        'body'  => 'this is a rpc test',
        'delay' => '0', 
        'TTR'   => '0' 
    ];
複製代碼

優先級任務

$data = [
        'id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_A","topic_B","topic_C"], //優先消費topic_A,消費完後消費topic_B,最後再消費topic_C
        'body'  => 'this is a rpc test',
        'delay' => '0', 
        'TTR'   => '0' 
    ];
複製代碼

6. web監控

gmq提供了一個簡單web監控平臺(後期會提供根據job.Id追蹤消息的功能),方便查看當前堆積任務數,默認監聽端口爲8000,例如:http://127.0.0.1:8000, 界面以下: web

後臺模板來源於https://github.com/george518/PPGo_Job

7. 遇到問題

如下是開發遇到的問題,以及一些粗糙的解決方案redis

7.1 安全退出

若是強行停止gmq的運行,可能會致使一些數據丟失,例以下面一個例子:
json

gmq之timer定時器

若是發生上面的狀況,就會出現job不在 bucket中,也不在 ready queue,這就出現了job丟失的狀況,並且將沒有任何機會去刪除 job pool中已丟失的job,長久以後 job pool可能會堆積不少的已丟失job的元數據;因此安全退出須要在接收到退出信號時,應該等待全部 goroutine處理完手中的事情,而後再退出

7.1.1 gmq退出流程

gmq安全退出.png

首先 gmq經過context傳遞關閉信號給 dispatcher, dispatcher接收到信號會關閉 dispatcher.closed,每一個 bucket會收到 close信號,而後先退出 timer檢索,再退出 bucket, dispatcher等待全部bucket退出後,而後退出

dispatcher退出順序流程: timer -> bucket -> dispatchersegmentfault

7.1.2 注意

不要使用kill -9 pid來強制殺死進程,由於系統沒法捕獲SIGKILL信號,致使gmq可能執行到一半就被強制停止,應該使用kill -15 pid,kill -1 pidkill -2 pid,各個數字對應信號以下:緩存

7.2 智能定時器

  • 每個bucket都會維護一個timer,不一樣於有贊設計,timer不是每秒輪詢一次,而是根據bucket下一個job到期時間來設置timer的定時時間 ,這樣的目的在於若是bucket沒有job或job到期時間要好久纔會發生,就能夠減小沒必要要的輪詢;
  • timer只有處理完一次業務後纔會重置定時器;,這樣的目的在於可能出現上一個時間週期還沒執行完畢,下一個定時事件又發生了
  • 若是到期的時間很相近,timer就會頻繁重置定時器時間,就目前使用來講,還沒出現什麼性能上的問題

7.3 原子性問題

咱們知道redis的命令是排隊執行,在一個複雜的業務中可能會屢次執行redis命令,若是在大併發的場景下,這個業務有可能中間插入了其餘業務的命令,致使出現各類各樣的問題;
redis保證整個事務原子性和一致性問題通常用multi/execlua腳本,gmq在操做涉及複雜業務時使用的是lua腳本,由於lua腳本除了有multi/exec的功能外,還有Pipepining功能(主要打包命令,減小和redis server通訊次數),下面是一個gmq定時器掃描bucket集合到期job的lua腳本:

-- 獲取到期的50個job
local jobIds = redis.call('zrangebyscore',KEYS[1], 0, ARGV[4], 'withscores', 'limit', 0, 50)
local res = {}
for k,jobId in ipairs(jobIds) do 
	if k%2~=0 then
		local jobKey = string.format('%s:%s', ARGV[3], jobId)
		local status = redis.call('hget', jobKey, 'status')
		-- 檢驗job狀態
		if tonumber(status) == tonumber(ARGV[1]) or tonumber(status) == tonumber(ARGV[2]) then
			-- 先移除集合中到期的job,而後到期的job返回給timer
			local isDel = redis.call('zrem', KEYS[1], jobId)
			if isDel == 1 then
				table.insert(res, jobId)
			end
		end
	end
end

local nextTime
-- 計算下一個job執行時間,用於設置timer下一個時鐘週期
local nextJob = redis.call('zrange', KEYS[1], 0, 0, 'withscores')
if next(nextJob) == nil then
	nextTime = -1
else
	nextTime = tonumber(nextJob[2]) - tonumber(ARGV[4])
	if nextTime < 0 then
		nextTime = 1
	end
end

table.insert(res,1,nextTime)
return res
複製代碼

7.4 redis鏈接池

可能通常phper寫業務不多會接觸到鏈接池,其實這是由php自己所決定他應用不大,固然在php的擴展swoole仍是頗有用處的
gmq的redis鏈接池是使用gomodule/redigo/redis自帶鏈接池,它帶來的好處是限制redis鏈接數,經過複用redis鏈接來減小開銷,另外能夠防止tcp被消耗完,這在生產者大量生成數據時會頗有用

// gmq/mq/redis.go
Redis = &RedisDB{
    Pool: &redis.Pool{
        MaxIdle:     30,    // 最大空閒連接
        MaxActive:   10000, // 最大連接
        IdleTimeout: 240 * time.Second, // 空閒連接超時
        Wait:        true, // 當鏈接池耗盡時,是否阻塞等待
        Dial: func() (redis.Conn, error) {
            c, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword(""))
            if err != nil {
                return nil, err
            }
            return c, nil
		},
        TestOnBorrow: func(c redis.Conn, t time.Time) error {
            if time.Since(t) < time.Minute {
            	return nil
            }
        	_, err := c.Do("PING")
        	return err
        },
    },
}
複製代碼

8. 注意問題

  • job.id在job pool是惟一的,它將做爲redis的緩存鍵;gmq不自動爲job生成惟一id值是爲了用戶能夠根據本身生成的job.id來追蹤job狀況,若是job.id是重複的,push時會報重複id的錯誤
  • bucket數量不是越多越好,通常來講,添加到ready queue的速度取決與redis性能,而不是bucket數量

9. 使用中可能出現的問題

9.1 客戶端出現大量的TIME_WAIT狀態,而且新的鏈接被拒絕

netstat -anp | grep 9503 | wc -l
tcp        0      0 10.8.8.188:41482        10.8.8.185:9503         TIME_WAIT   -                   
複製代碼

這個是正常現象,由tcp四次揮手能夠知道,當接收到LAST_ACK發出的FIN後會處於TIME_WAIT狀態,主動關閉方(客戶端)爲了確保被動關閉方(服務端)收到ACK,會等待2MSL時間,這個時間是爲了再次發送ACK,例如被動關閉方可能由於接收不到ACK而重傳FIN;另外也是爲了舊數據過時,不影響到下一個連接,; 若是要避免大量TIME_WAIT的鏈接致使tcp被耗盡;通常方法以下:

  • 使用長鏈接
  • 配置文件,網上不少教程,就是讓系統儘快的回收TIME_WAIT狀態的鏈接
  • 使用鏈接池,當鏈接池耗盡時,阻塞等待,直到回收再利用

10. 相關連接

11. 將來計劃

  • 支持安全傳輸層協議(TLS)
  • 除了json外,可支持protobuf序列化
  • web監控工具提供消息追蹤功能
  • 增長分佈式部署方案
  • 增長數據統計收集器
  • 可持久化到磁盤
  • 支持http協議
  • 增長調試和分析 pprof

12. 項目地址

相關文章
相關標籤/搜索