做者:林冠宏 / 指尖下的幽靈html
掘金:juejin.im/user/587f0d…git
博客:www.cnblogs.com/linguanh/github
GitHub : github.com/af913337456…redis
騰訊雲專欄: cloud.tencent.com/developer/u…數據庫
蟲洞區塊鏈專欄:www.chongdongshequ.com/article/153…json
目前的開發工做主要是將
傳統電商應用
和區塊鏈技術
相結合,區塊鏈平臺依然是以太坊
,此外地,這幾天由我編寫,經清華大學出版社出版的書籍,歷經八月,終於出版上架了,名稱是:《區塊鏈以太坊DApp開發實戰》
,現已能夠網購。後端
本文所要分享的思路就是電商應用中經常使用的
訂單隊列
。緩存
電商應用中,簡單直觀的用戶從下單到付款,最終完成整個流程的步驟能夠用下圖表示:服務器
其中,訂單信息持久化
,就是存儲數據到數據庫中。而最終客戶端完成支付後的更新訂單狀態
的操做是由第三方支付平臺進行回調設置好的回調連接 NotifyUrl
,來進行的。網絡
補全訂單狀態的更新流程,以下圖表示:
服務端的直接瓶頸點
,首先要考慮 TPS
。去除細分點,咱們主要看訂單信息持久化
瓶頸點。
在高併發業務場景中,例如 秒殺
、優惠價搶購
等。短期內的下單請求數會不少,若是訂單信息持久化
部分,不作優化,而是直接對數據庫層進行頻繁的
讀寫操做,數據庫會承受不了,容易成爲第一個垮掉的服務,好比下圖的所示的常規寫單流程:
能夠看到,每
持久化一個訂單信息,通常要經歷網絡鏈接操做(連接數據庫),以及多個 I/O
操做。
得益於鏈接池
技術,咱們能夠在連接數據庫的時候,不用每次都從新發起一次完整的HTTP請求,而能夠直接從池中獲取已打開了的鏈接句柄,而直接使用,這點和線程池的原理差很少。
此外,咱們還能夠在上面的流程中加入更多的優化,例如對於一些須要讀取的信息,能夠事先存置到內存緩存層,並加於更新維護,這樣在使用的時候,能夠快速讀取。
即便咱們都具有了上述的一些優化手段,可是對於寫操做
的I/O
阻塞耗時,在高併發請求
的時候,依然容易致使數據庫承受不住,容易出現連接多開異常
,操做超時
等問題。
在該層進行優化的操做,除了上面談到的以外,還有下面一些手段:
每種方式有各自的特色,由於本文談的是訂單隊列
的架構思想,因此下面咱們來看下如何在訂單系統中引入訂單隊列。
網上有很多文章談到訂單隊列的作法,大部分都漏了說明請求與響應的一致性問題。
第一種訂單隊列
流程圖:上圖是大多文章提到的隊列模型,有兩個沒有解析的問題:
notifyUrl
,而此時 ② 還在排隊等待處理,這種狀況又如何處理。首先,要確定的是,上面的訂單流程圖是沒有問題的。它有下面的優缺點,所提到的兩個問題也是有解決方案的。
優勢:
搭配中間件
的組合性強。缺點:
上面談及的問題點,我後面都會給出解決方案。下面咱們來看下另一種訂單隊列流程圖。
第二種訂單隊列
流程圖:第二種訂單隊列的設計模型,注意它的同步等待
持久化處理的結果,解決了持久化與響應的一致性問題,可是有個嚴重的耗時等待問題,它的優缺點以下:
優勢:
缺點:
這類訂單隊列,我下面會放出 Golang
實現的版本代碼。
對比上面兩種常見的訂單模型,若是從用戶體驗的角度
去優先考慮,第一種不須要用戶等待持久化處理
結果的是明顯優於第二種的。若是技術團隊完善,且技術過硬,也應該考慮第一種的實現方式。
若是僅僅想要達到寧願用戶等待到超時
也不肯意存儲層服務被沖垮,那麼有限考慮第二種。
在這裏,咱們進一步細分一下,實現隊列模塊的功能有哪些選擇。
相信不少後端開發經驗比較老道的同志已經想到了,使用現有的中間件,好比知名的 Redis
、RocketMQ
,以及 Kafka
等,它們都是一種選擇。
此外地,咱們還能夠直接編寫代碼,在當前的服務系統中實現一個消息隊列來達到目的,下面我用圖來分類下隊列類型。
不一樣的隊列實現方式,能直接致使不一樣的功能,也有不一樣的優缺點:
一級緩存優勢:
一級緩存缺點:
中間件的優勢:
增量
持久化,能最大程度減小因不可預料的崩潰致使訂單信息丟失;中間件的缺點:
回到第一種訂單模型中:
問題1:
若是訂單存在第三方支付狀況,① 和 ② 的一致性如何保證?
首先咱們看下,不一致性的時候,會產生什麼結果:
響應頁面
完成了支付動做,用戶查看訂單信息爲空白。上述的狀況,明顯地,只有 3 是須要恢復訂單信息的,應對的方案有:
表A
。同時啓動一個定時任務B
專門遍歷表A,而後去訂單列表尋找是否已經有了對應的訂單信息,有則更新,沒則繼續,或跟隨制定的檢測策略走。非崩潰性緣由
而致使失敗時:
隊列頭部
,等待下一次的從新持久化處理。崩潰性
緣由而致使失敗時:
定時任務B
在進行了屢次檢測無果後,那麼根據第三方支付平臺在回調時候傳遞過來的訂單附屬信息
對訂單進行恢復。定時任務B
所在服務最好
和回調連接 notifyUrl
所在的接口服務一致,這樣能保證當 B 掛掉的時候,回調服務也跟隨掛掉,而後第三方支付平臺在調用回調失敗的狀況下,他們會有重試邏輯
,依賴這個,在回調服務重啓時,能夠完成訂單信息恢復。問題2:
若是訂單存在第三方支付狀況,① 完成了支付,且三方支付平臺回調了 notifyUrl,而此時 ② 還在排隊等待處理,這種狀況又如何處理?
應對的方案參考 問題1
的 定時任務B
檢測修改機制。
定義一些常量
const (
QueueOrderKey = "order_queue"
QueueBufferSize = 1024 // 請求隊列大小
QueueHandleTime = time.Second * 7 // 單個 mission 超時時間
)
複製代碼
定義出入隊接口,方便多種實現
// 定義出入隊接口,方便多種實現
type IQueue interface {
Push(key string,data []byte) error
Pop(key string) ([]byte,error)
}
複製代碼
定義請求與響應實體
// 定義請求與響應實體
type QueueTimeoutResp struct {
Timeout bool // 超時標誌位
Response chan interface{}
}
type QueueRequest struct {
ReqId string `json:"req_id"` // 單次請求 id
Order *model.OrderCombine `json:"order"` // 訂單信息 bean
AccessTime int64 `json:"access_time"` // 請求時間
ResponseChan *QueueTimeoutResp `json:"-"`
}
複製代碼
定義隊列實體
// 定義隊列實體
type Queue struct {
mapLock sync.Mutex
RequestChan chan *QueueRequest // 緩存管道,裝載請求
RequestMap map[string]*QueueTimeoutResp
Queue IQueue
}
複製代碼
實例化隊列,接收接口參數
// 實例化隊列,接收接口參數
func NewQueue(queue IQueue) *Queue {
return &Queue{
mapLock: sync.Mutex{},
RequestChan: make(chan *QueueRequest, QueueBufferSize),
RequestMap: make(map[string]*QueueTimeoutResp, QueueBufferSize),
Queue: queue,
}
}
複製代碼
接收請求
// 接收請求
func (q *Queue) AcceptRequest(req *QueueRequest) interface{} {
if req.ResponseChan == nil {
req.ResponseChan = &QueueTimeoutResp{
Timeout: false,
Response: make(chan interface{},1),
}
}
userKey := key(req) // 惟一 key 生成函數
req.ReqId = userKey
q.mapLock.Lock()
q.RequestMap[userKey] = req.ResponseChan // 內存層存儲對應的 req 的 resp 管道指針
q.mapLock.Unlock()
q.RequestChan <- req // 接收請求
log("userKey : ", userKey)
ticker := time.NewTicker(QueueHandleTime) // 以超時時間 QueueHandleTime 啓動一個定時器
defer func() {
ticker.Stop() // 釋放定時器
q.mapLock.Lock()
delete(q.RequestMap,userKey) // 當處理完一個 req,從 map 中移出
q.mapLock.Unlock()
}()
select {
case <-ticker.C: // 超時
req.ResponseChan.Timeout = true
Queue_TimeoutCounter++ // 輔助計數,int 類型
log("timeout: ",userKey)
return lghError.HandleTimeOut // 返回超時錯誤的信息
case result := <-req.ResponseChan.Response: // req 被完整處理
return result
}
}
複製代碼
從請求管道中取出 req 放入到隊列容器中,該函數在 gorutine
中運行
// 從請求管道中取出 req 放入到隊列容器中,該函數在 gorutine 中運行
func (q *Queue) addToQueue() {
for {
req := <-q.RequestChan // 取出一個 req
data, err := json.Marshal(req)
if err != nil {
log("redis queue parse req failed : ", err.Error())
continue
}
if err = q.Queue.Push(QueueOrderKey, data);err != nil { // push 入隊,這裏有時間消耗
log("lpush req failed. Error : ", err.Error())
continue
}
log("lpush req success. req time: ", req.AccessTime)
}
}
複製代碼
取出 req 處理,該函數在 gorutine
中運行
// 取出 req 處理,該函數在 gorutine 中運行
func (q *Queue) readFromQueue() {
for {
data, err := q.Queue.Pop(QueueOrderKey) // pop 出隊,這裏也有時間消耗
if err != nil {
log("lpop failed. Error : ", err.Error())
continue
}
if data == nil || len(data) == 0 {
time.Sleep(time.Millisecond * 100) // 空數據的 req,停頓下再取
continue
}
req := &QueueRequest{}
if err = json.Unmarshal(data, req);err != nil {
log("Lpop: json.Unmarshal failed. Error : ", err.Error())
continue
}
userKey := req.ReqId
q.mapLock.Lock()
resultChan, ok := q.RequestMap[userKey] // 取出對應的 resp 管道指針
q.mapLock.Unlock()
if !ok {
// 中間件重啓時,好比 redis 重啓而讀取舊 key,會進入這裏
Queue_KeyNotFound ++ // 計數 int 類型
log("key not found, rollback: ", userKey)
continue
}
simulationTimeOutReq4(req) // 模擬出來任務的函數,入參爲 req
if resultChan.Timeout {
// 處理期間,已經超時,這裏作能夠拓展回滾操做
Queue_MissionTimeout ++
log("handle mission timeout: ", userKey)
continue
}
log("request result send to chan succeee, userKey : ", userKey)
ret := util.GetCommonSuccess(req.AccessTime)
resultChan.Response <- &ret // 輸入處理成功
}
}
複製代碼
啓動
func (q *Queue) Start() {
go q.addToQueue()
go q.readFromQueue()
}
複製代碼
運行例子
func test(){
...
runtime.GOMAXPROCS(4)
redisQueue := NewQueue(NewFastCacheQueue())
redisQueue.Start()
reqNumber := testReqNumber
wg := sync.WaitGroup{}
wg.Add(reqNumber)
for i :=0;i<reqNumber;i++ {
go func(index int) {
combine := model.OrderCombine{}
ret := AcceptRequest(&QueueRequest{
UserId: int64(index),
Order: &combine,
AccessTime: time.Now().Unix(),
ResponseChan: nil,
})
fmt.Println("ret: ------------- ",ret.String())
wg.Done()
}(i)
}
wg.Wait()
time.Sleep(3*time.Second)
fmt.Println("TimeoutCounter: ",Queue_TimeoutCounter,"KeyNotFound: ",Queue_KeyNotFound,"MissionTimeout: ",Queue_MissionTimeout)
}
複製代碼