做者:林冠宏 / 指尖下的幽靈git
博客:http://www.cnblogs.com/linguanh/github
GitHub : https://github.com/af913337456/redis
目前的開發工做主要是將
傳統電商應用
和區塊鏈技術
相結合,區塊鏈平臺依然是以太坊
,此外地,這幾天由我編寫,經清華大學出版社出版的書籍,歷經八月,終於出版上架了,名稱是:《區塊鏈以太坊DApp開發實戰》
,現已能夠網購。數據庫
本文所要分享的思路就是電商應用中經常使用的
訂單隊列
。json
電商應用中,簡單直觀的用戶從下單到付款,最終完成整個流程的步驟能夠用下圖表示:後端
其中,訂單信息持久化
,就是存儲數據到數據庫中。而最終客戶端完成支付後的更新訂單狀態
的操做是由第三方支付平臺進行回調設置好的回調連接 NotifyUrl
,來進行的。緩存
補全訂單狀態的更新流程,以下圖表示:服務器
服務端的直接瓶頸點
,首先要考慮 TPS
。去除細分點,咱們主要看訂單信息持久化
瓶頸點。網絡
在高併發業務場景中,例如 秒殺
、優惠價搶購
等。短期內的下單請求數會不少,若是訂單信息持久化
部分,不作優化,而是直接對數據庫層進行頻繁的
讀寫操做,數據庫會承受不了,容易成爲第一個垮掉的服務,好比下圖的所示的常規寫單流程:架構
能夠看到,每
持久化一個訂單信息,通常要經歷網絡鏈接操做(連接數據庫),以及多個 I/O
操做。
得益於鏈接池
技術,咱們能夠在連接數據庫的時候,不用每次都從新發起一次完整的HTTP請求,而能夠直接從池中獲取已打開了的鏈接句柄,而直接使用,這點和線程池的原理差很少。
此外,咱們還能夠在上面的流程中加入更多的優化,例如對於一些須要讀取的信息,能夠事先存置到內存緩存層,並加於更新維護,這樣在使用的時候,能夠快速讀取。
即便咱們都具有了上述的一些優化手段,可是對於寫操做
的I/O
阻塞耗時,在高併發請求
的時候,依然容易致使數據庫承受不住,容易出現連接多開異常
,操做超時
等問題。
在該層進行優化的操做,除了上面談到的以外,還有下面一些手段:
每種方式有各自的特色,由於本文談的是訂單隊列
的架構思想,因此下面咱們來看下如何在訂單系統中引入訂單隊列。
網上有很多文章談到訂單隊列的作法,大部分都漏了說明請求與響應的一致性問題。
第一種訂單隊列
流程圖:上圖是大多文章提到的隊列模型,有兩個沒有解析的問題:
notifyUrl
,而此時 ② 還在排隊等待處理,這種狀況又如何處理。首先,要確定的是,上面的訂單流程圖是沒有問題的。它有下面的優缺點,所提到的兩個問題也是有解決方案的。
優勢:
搭配中間件
的組合性強。缺點:
上面談及的問題點,我後面都會給出解決方案。下面咱們來看下另一種訂單隊列流程圖。
第二種訂單隊列
流程圖:第二種訂單隊列的設計模型,注意它的同步等待
持久化處理的結果,解決了持久化與響應的一致性問題,可是有個嚴重的耗時等待問題,它的優缺點以下:
優勢:
缺點:
這類訂單隊列,我下面會放出 Golang
實現的版本代碼。
對比上面兩種常見的訂單模型,若是從用戶體驗的角度
去優先考慮,第一種不須要用戶等待持久化處理
結果的是明顯優於第二種的。若是技術團隊完善,且技術過硬,也應該考慮第一種的實現方式。
若是僅僅想要達到寧願用戶等待到超時
也不肯意存儲層服務被沖垮,那麼有限考慮第二種。
在這裏,咱們進一步細分一下,實現隊列模塊的功能有哪些選擇。
相信不少後端開發經驗比較老道的同志已經想到了,使用現有的中間件,好比知名的 Redis
、RocketMQ
,以及 Kafka
等,它們都是一種選擇。
此外地,咱們還能夠直接編寫代碼,在當前的服務系統中實現一個消息隊列來達到目的,下面我用圖來分類下隊列類型。
不一樣的隊列實現方式,能直接致使不一樣的功能,也有不一樣的優缺點:
一級緩存優勢:
一級緩存缺點:
中間件的優勢:
增量
持久化,能最大程度減小因不可預料的崩潰致使訂單信息丟失;中間件的缺點:
回到第一種訂單模型中:
問題1:
若是訂單存在第三方支付狀況,① 和 ② 的一致性如何保證?
首先咱們看下,不一致性的時候,會產生什麼結果:
響應頁面
完成了支付動做,用戶查看訂單信息爲空白。上述的狀況,明顯地,只有 3 是須要恢復訂單信息的,應對的方案有:
表A
。同時啓動一個定時任務B
專門遍歷表A,而後去訂單列表尋找是否已經有了對應的訂單信息,有則更新,沒則繼續,或跟隨制定的檢測策略走。非崩潰性緣由
而致使失敗時:
隊列頭部
,等待下一次的從新持久化處理。崩潰性
緣由而致使失敗時:
定時任務B
在進行了屢次檢測無果後,那麼根據第三方支付平臺在回調時候傳遞過來的訂單附屬信息
對訂單進行恢復。定時任務B
所在服務最好
和回調連接 notifyUrl
所在的接口服務一致,這樣能保證當 B 掛掉的時候,回調服務也跟隨掛掉,而後第三方支付平臺在調用回調失敗的狀況下,他們會有重試邏輯
,依賴這個,在回調服務重啓時,能夠完成訂單信息恢復。問題2:
若是訂單存在第三方支付狀況,① 完成了支付,且三方支付平臺回調了 notifyUrl,而此時 ② 還在排隊等待處理,這種狀況又如何處理?
應對的方案參考 問題1
的 定時任務B
檢測修改機制。
定義一些常量
const ( QueueOrderKey = "order_queue" QueueBufferSize = 1024 // 請求隊列大小 QueueHandleTime = time.Second * 7 // 單個 mission 超時時間 )
定義出入隊接口,方便多種實現
// 定義出入隊接口,方便多種實現 type IQueue interface { Push(key string,data []byte) error Pop(key string) ([]byte,error) }
定義請求與響應實體
// 定義請求與響應實體 type QueueTimeoutResp struct { Timeout bool // 超時標誌位 Response chan interface{} } type QueueRequest struct { ReqId string `json:"req_id"` // 單次請求 id Order *model.OrderCombine `json:"order"` // 訂單信息 bean AccessTime int64 `json:"access_time"` // 請求時間 ResponseChan *QueueTimeoutResp `json:"-"` }
定義隊列實體
// 定義隊列實體 type Queue struct { mapLock sync.Mutex RequestChan chan *QueueRequest // 緩存管道,裝載請求 RequestMap map[string]*QueueTimeoutResp Queue IQueue }
實例化隊列,接收接口參數
// 實例化隊列,接收接口參數 func NewQueue(queue IQueue) *Queue { return &Queue{ mapLock: sync.Mutex{}, RequestChan: make(chan *QueueRequest, QueueBufferSize), RequestMap: make(map[string]*QueueTimeoutResp, QueueBufferSize), Queue: queue, } }
接收請求
// 接收請求 func (q *Queue) AcceptRequest(req *QueueRequest) interface{} { if req.ResponseChan == nil { req.ResponseChan = &QueueTimeoutResp{ Timeout: false, Response: make(chan interface{},1), } } userKey := key(req) // 惟一 key 生成函數 req.ReqId = userKey q.mapLock.Lock() q.RequestMap[userKey] = req.ResponseChan // 內存層存儲對應的 req 的 resp 管道指針 q.mapLock.Unlock() q.RequestChan <- req // 接收請求 log("userKey : ", userKey) ticker := time.NewTicker(QueueHandleTime) // 以超時時間 QueueHandleTime 啓動一個定時器 defer func() { ticker.Stop() // 釋放定時器 q.mapLock.Lock() delete(q.RequestMap,userKey) // 當處理完一個 req,從 map 中移出 q.mapLock.Unlock() }() select { case <-ticker.C: // 超時 req.ResponseChan.Timeout = true Queue_TimeoutCounter++ // 輔助計數,int 類型 log("timeout: ",userKey) return lghError.HandleTimeOut // 返回超時錯誤的信息 case result := <-req.ResponseChan.Response: // req 被完整處理 return result } }
從請求管道中取出 req 放入到隊列容器中,該函數在 gorutine
中運行
// 從請求管道中取出 req 放入到隊列容器中,該函數在 gorutine 中運行 func (q *Queue) addToQueue() { for { req := <-q.RequestChan // 取出一個 req data, err := json.Marshal(req) if err != nil { log("redis queue parse req failed : ", err.Error()) continue } if err = q.Queue.Push(QueueOrderKey, data);err != nil { // push 入隊,這裏有時間消耗 log("lpush req failed. Error : ", err.Error()) continue } log("lpush req success. req time: ", req.AccessTime) } }
取出 req 處理,該函數在 gorutine
中運行
// 取出 req 處理,該函數在 gorutine 中運行 func (q *Queue) readFromQueue() { for { data, err := q.Queue.Pop(QueueOrderKey) // pop 出隊,這裏也有時間消耗 if err != nil { log("lpop failed. Error : ", err.Error()) continue } if data == nil || len(data) == 0 { time.Sleep(time.Millisecond * 100) // 空數據的 req,停頓下再取 continue } req := &QueueRequest{} if err = json.Unmarshal(data, req);err != nil { log("Lpop: json.Unmarshal failed. Error : ", err.Error()) continue } userKey := req.ReqId q.mapLock.Lock() resultChan, ok := q.RequestMap[userKey] // 取出對應的 resp 管道指針 q.mapLock.Unlock() if !ok { // 中間件重啓時,好比 redis 重啓而讀取舊 key,會進入這裏 Queue_KeyNotFound ++ // 計數 int 類型 log("key not found, rollback: ", userKey) continue } simulationTimeOutReq4(req) // 模擬出來任務的函數,入參爲 req if resultChan.Timeout { // 處理期間,已經超時,這裏作能夠拓展回滾操做 Queue_MissionTimeout ++ log("handle mission timeout: ", userKey) continue } log("request result send to chan succeee, userKey : ", userKey) ret := util.GetCommonSuccess(req.AccessTime) resultChan.Response <- &ret // 輸入處理成功 } }
啓動
func (q *Queue) Start() { go q.addToQueue() go q.readFromQueue() }
運行例子
func test(){ ... runtime.GOMAXPROCS(4) redisQueue := NewQueue(NewFastCacheQueue()) redisQueue.Start() reqNumber := testReqNumber wg := sync.WaitGroup{} wg.Add(reqNumber) for i :=0;i<reqNumber;i++ { go func(index int) { combine := model.OrderCombine{} ret := AcceptRequest(&QueueRequest{ UserId: int64(index), Order: &combine, AccessTime: time.Now().Unix(), ResponseChan: nil, }) fmt.Println("ret: ------------- ",ret.String()) wg.Done() }(i) } wg.Wait() time.Sleep(3*time.Second) fmt.Println("TimeoutCounter: ",Queue_TimeoutCounter,"KeyNotFound: ",Queue_KeyNotFound,"MissionTimeout: ",Queue_MissionTimeout) }