gmq
是基於redis
提供的特性,使用go
語言開發的一個簡單易用的隊列;關於redis使用特性能夠參考以前本人寫過一篇很簡陋的文章Redis 實現隊列; gmq
的靈感和設計是基於有贊延遲隊列設計,文章內容清晰並且很好理解,可是沒有提供源碼,在文章的最後也提到了一些將來架構方向; gmq
不是簡單按照有贊延遲隊列的設計實現功能,在它的基礎上,作了一些修改和優化,主要以下:php
dispatcher
調度分配各個bucket
,而不是由timer
bucket
維護一個timer
,而不是全部bucket一個timer
timer
每次掃描bucket
到期job
時,會一次性返回多個到期job
,而不是每次只返回一個job
timer
的掃描時鐘由bucket
中下個job
到期時間決定,而不是每秒掃描一次(TTR)
沒有執行完畢或程序被意外中斷,則消息從新回到隊列再次被消費,通常用於數據比較敏感,不容丟失的dispatcher
任務調度器,負責將job
分配到bucket
或直接推送到ready queue
bucket
任務桶,用於存放延遲任務;每一個bucket
會維護一個timer
定時器,而後將到期的job
推送到ready queue
ready queue
存放已準備好的job
,等待被consumer
消費ready queue
,等待被消費ready queue
,等待被消費參考第一個圖的流程,當job被消費者讀取後,若是job.TTR>0
,即job設置了執行超時時間,那麼job會在讀取後會被添加到TTRBucket(專門存放設置了超時時間的job),而且設置job.delay = job.TTR
,若是在TTR時間內沒有獲得消費者ack確認而後刪除job,job將在TTR時間以後添加到ready queue
,而後再次被消費(若是消費者在TTR時間以後才請求ack,會獲得失敗的響應)python
主要和TTR的設置有關係,確認機制能夠分爲兩種:git
pop
出job時,即會自動刪除job pool
中的job元數據pop
出job時開始到用戶ack
確認刪除結束這段時間,若是在這段時間沒有ACK
,job會被再次加入到ready queue
,而後再次被消費,只有用戶調用了ACK
,纔會去刪除job pool
中job元數據配置文件位於gmq/conf.ini
,能夠根據本身項目需求修改配置github
git clone https://github.com/wuzhc/gmq.git
cd gmq
go get -u -v github.com/kardianos/govendor # 若是有就不須要安裝了
govendor sync
go run main.go
# go build # 可編譯成可執行文件
複製代碼
# 啓動
./gmq start
# 中止
./gmq stop
# 守護進程模式啓動,不輸出日誌到console
nohup ./gmq start >/dev/null 2>&1 &
# 守護進程模式下查看日誌輸出(配置文件conf.ini須要設置target_type=file,filename=gmq.log)
tail -f gmq.log
複製代碼
目前只實現python,go,php語言的客戶端的demo,參考:github.com/wuzhc/demo/…golang
# php
# 生產者
php producer.php
# 消費者
php consumer.php
# python
# 生產者
python producer.py
# 消費者
python consumer.py
複製代碼
{
"id": "xxxx", # 任務id,這個必須是一個惟一值,將做爲redis的緩存鍵
"topic": "xxx", # topic是一組job的分類名,消費者將訂閱topic來消費該分類下的job
"body": "xxx", # 消息內容
"delay": "111", # 延遲時間,單位秒
"TTR": "11111", # 執行超時時間,單位秒
"status": 1, # job執行狀態,該字段由gmq生成
"consumeNum":1, # 被消費的次數,主要記錄TTR>0時,被重複消費的次數,該字段由gmq生成
}
複製代碼
$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '1800', // 單位秒,半個小時後執行
'TTR' => '0'
];
複製代碼
$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '100' // 100秒後還未獲得消費者ack確認,則再次添加到隊列,將再次被被消費
];
複製代碼
$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '0'
];
複製代碼
$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_A","topic_B","topic_C"], //優先消費topic_A,消費完後消費topic_B,最後再消費topic_C
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '0'
];
複製代碼
gmq
提供了一個簡單web監控平臺(後期會提供根據job.Id追蹤消息的功能),方便查看當前堆積任務數,默認監聽端口爲8000
,例如:http://127.0.0.1:8000, 界面以下: web
如下是開發遇到的問題,以及一些粗糙的解決方案redis
若是強行停止gmq
的運行,可能會致使一些數據丟失,例以下面一個例子:
json
bucket
中,也不在
ready queue
,這就出現了job丟失的狀況,並且將沒有任何機會去刪除
job pool
中已丟失的job,長久以後
job pool
可能會堆積不少的已丟失job的元數據;因此安全退出須要在接收到退出信號時,應該等待全部
goroutine
處理完手中的事情,而後再退出
gmq
退出流程gmq
經過context傳遞關閉信號給
dispatcher
,
dispatcher
接收到信號會關閉
dispatcher.closed
,每一個
bucket
會收到
close
信號,而後先退出
timer
檢索,再退出
bucket
,
dispatcher
等待全部bucket退出後,而後退出
dispatcher
退出順序流程: timer
-> bucket
-> dispatcher
segmentfault
不要使用kill -9 pid
來強制殺死進程,由於系統沒法捕獲SIGKILL信號,致使gmq可能執行到一半就被強制停止,應該使用kill -15 pid
,kill -1 pid
或kill -2 pid
,各個數字對應信號以下:緩存
bucket
都會維護一個timer
,不一樣於有贊設計,timer
不是每秒輪詢一次,而是根據bucket
下一個job到期時間來設置timer
的定時時間 ,這樣的目的在於若是bucket
沒有job或job到期時間要好久纔會發生,就能夠減小沒必要要的輪詢;timer
只有處理完一次業務後纔會重置定時器;,這樣的目的在於可能出現上一個時間週期還沒執行完畢,下一個定時事件又發生了timer
就會頻繁重置定時器時間,就目前使用來講,還沒出現什麼性能上的問題咱們知道redis的命令是排隊執行,在一個複雜的業務中可能會屢次執行redis命令,若是在大併發的場景下,這個業務有可能中間插入了其餘業務的命令,致使出現各類各樣的問題;
redis保證整個事務原子性和一致性問題通常用multi/exec
或lua腳本
,gmq
在操做涉及複雜業務時使用的是lua腳本
,由於lua腳本
除了有multi/exec
的功能外,還有Pipepining
功能(主要打包命令,減小和redis server
通訊次數),下面是一個gmq
定時器掃描bucket集合到期job的lua腳本:
-- 獲取到期的50個job
local jobIds = redis.call('zrangebyscore',KEYS[1], 0, ARGV[4], 'withscores', 'limit', 0, 50)
local res = {}
for k,jobId in ipairs(jobIds) do
if k%2~=0 then
local jobKey = string.format('%s:%s', ARGV[3], jobId)
local status = redis.call('hget', jobKey, 'status')
-- 檢驗job狀態
if tonumber(status) == tonumber(ARGV[1]) or tonumber(status) == tonumber(ARGV[2]) then
-- 先移除集合中到期的job,而後到期的job返回給timer
local isDel = redis.call('zrem', KEYS[1], jobId)
if isDel == 1 then
table.insert(res, jobId)
end
end
end
end
local nextTime
-- 計算下一個job執行時間,用於設置timer下一個時鐘週期
local nextJob = redis.call('zrange', KEYS[1], 0, 0, 'withscores')
if next(nextJob) == nil then
nextTime = -1
else
nextTime = tonumber(nextJob[2]) - tonumber(ARGV[4])
if nextTime < 0 then
nextTime = 1
end
end
table.insert(res,1,nextTime)
return res
複製代碼
可能通常phper寫業務不多會接觸到鏈接池,其實這是由php自己所決定他應用不大,固然在php的擴展swoole
仍是頗有用處的
gmq
的redis鏈接池是使用gomodule/redigo/redis
自帶鏈接池,它帶來的好處是限制redis鏈接數,經過複用redis鏈接來減小開銷,另外能夠防止tcp被消耗完,這在生產者大量生成數據時會頗有用
// gmq/mq/redis.go
Redis = &RedisDB{
Pool: &redis.Pool{
MaxIdle: 30, // 最大空閒連接
MaxActive: 10000, // 最大連接
IdleTimeout: 240 * time.Second, // 空閒連接超時
Wait: true, // 當鏈接池耗盡時,是否阻塞等待
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword(""))
if err != nil {
return nil, err
}
return c, nil
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
},
}
複製代碼
job pool
是惟一的,它將做爲redis的緩存鍵;gmq
不自動爲job生成惟一id值是爲了用戶能夠根據本身生成的job.id來追蹤job狀況,若是job.id是重複的,push時會報重複id的錯誤ready queue
的速度取決與redis性能,而不是bucket數量netstat -anp | grep 9503 | wc -l
tcp 0 0 10.8.8.188:41482 10.8.8.185:9503 TIME_WAIT -
複製代碼
這個是正常現象,由tcp四次揮手能夠知道,當接收到LAST_ACK發出的FIN後會處於TIME_WAIT
狀態,主動關閉方(客戶端)爲了確保被動關閉方(服務端)收到ACK,會等待2MSL時間,這個時間是爲了再次發送ACK,例如被動關閉方可能由於接收不到ACK而重傳FIN;另外也是爲了舊數據過時,不影響到下一個連接,; 若是要避免大量TIME_WAIT
的鏈接致使tcp被耗盡;通常方法以下:
TIME_WAIT
狀態的鏈接json
外,可支持protobuf
序列化