通常電商應用的訂單隊列架構思想

做者:林冠宏 / 指尖下的幽靈html

掘金:juejin.im/user/587f0d…git

博客:www.cnblogs.com/linguanh/github

GitHub : github.com/af913337456…redis

騰訊雲專欄: cloud.tencent.com/developer/u…數據庫

蟲洞區塊鏈專欄:www.chongdongshequ.com/article/153…json


目錄

  • 前序
  • 通常的訂單流程
  • 思考瓶頸點
  • 訂單隊列
    • 第一種訂單隊列
    • 第二種訂單隊列
    • 總結
  • 實現隊列的選擇
  • 解答
  • 實現隊列的選擇
  • 第二種隊列的 Go 版本例子代碼

前序

目前的開發工做主要是將傳統電商應用區塊鏈技術相結合,區塊鏈平臺依然是以太坊,此外地,這幾天由我編寫,經清華大學出版社出版的書籍,歷經八月,終於出版上架了,名稱是:《區塊鏈以太坊DApp開發實戰》,現已能夠網購。後端

本文所要分享的思路就是電商應用中經常使用的訂單隊列緩存

通常的訂單流程

電商應用中,簡單直觀的用戶從下單到付款,最終完成整個流程的步驟能夠用下圖表示:服務器

其中,訂單信息持久化,就是存儲數據到數據庫中。而最終客戶端完成支付後的更新訂單狀態的操做是由第三方支付平臺進行回調設置好的回調連接 NotifyUrl,來進行的。網絡

補全訂單狀態的更新流程,以下圖表示:

思考瓶頸點

服務端的直接瓶頸點,首先要考慮 TPS。去除細分點,咱們主要看訂單信息持久化瓶頸點。

在高併發業務場景中,例如 秒殺優惠價搶購等。短期內的下單請求數會不少,若是訂單信息持久化 部分,不作優化,而是直接對數據庫層進行頻繁的讀寫操做,數據庫會承受不了,容易成爲第一個垮掉的服務,好比下圖的所示的常規寫單流程:

能夠看到,持久化一個訂單信息,通常要經歷網絡鏈接操做(連接數據庫),以及多個 I/O 操做。

得益於鏈接池技術,咱們能夠在連接數據庫的時候,不用每次都從新發起一次完整的HTTP請求,而能夠直接從池中獲取已打開了的鏈接句柄,而直接使用,這點和線程池的原理差很少。

此外,咱們還能夠在上面的流程中加入更多的優化,例如對於一些須要讀取的信息,能夠事先存置到內存緩存層,並加於更新維護,這樣在使用的時候,能夠快速讀取。

即便咱們都具有了上述的一些優化手段,可是對於寫操做I/O阻塞耗時,在高併發請求的時候,依然容易致使數據庫承受不住,容易出現連接多開異常操做超時等問題。

在該層進行優化的操做,除了上面談到的以外,還有下面一些手段:

  • 數據庫集羣,採用讀寫分離,減小寫時壓力
  • 分庫,不一樣業務的表放到不一樣的數據庫,會引入分佈式事務問題
  • 採用隊列模型削峯

每種方式有各自的特色,由於本文談的是訂單隊列的架構思想,因此下面咱們來看下如何在訂單系統中引入訂單隊列。

訂單隊列

網上有很多文章談到訂單隊列的作法,大部分都漏了說明請求與響應的一致性問題。

第一種訂單隊列流程圖:

上圖是大多文章提到的隊列模型,有兩個沒有解析的問題:

  1. 若是訂單存在第三方支付狀況,① 和 ② 的一致性如何保證,好比其中一到處理失敗;
  2. 若是訂單存在第三方支付狀況,① 完成了支付,且三方支付平臺回調了 notifyUrl,而此時 ② 還在排隊等待處理,這種狀況又如何處理。

首先,要確定的是,上面的訂單流程圖是沒有問題的。它有下面的優缺點,所提到的兩個問題也是有解決方案的。

優勢:

  • 用戶無需等待訂單持久化處理,而能直接得到響應,實現快速下單
  • 持久化處理,採用排隊的先來先處理,不會像上面談到的高併發請求一塊兒衝擊數據庫層面的狀況。
  • 可變性強,搭配中間件的組合性強。

缺點:

  • 多訂單入隊時,② 步驟的處理速度跟不上。從而致使第二點問題。
  • 實現較複雜

上面談及的問題點,我後面都會給出解決方案。下面咱們來看下另一種訂單隊列流程圖。

第二種訂單隊列流程圖:

第二種訂單隊列的設計模型,注意它的同步等待持久化處理的結果,解決了持久化與響應的一致性問題,可是有個嚴重的耗時等待問題,它的優缺點以下:

優勢:

  1. 持久化與響應的強一致性。
  2. 持久化處理,採用排隊的先來先處理,不會像上面談到的高併發請求一塊兒衝擊數據庫層面的狀況。
  3. 實現簡單

缺點:

  1. 多訂單入隊時,持久化單元處理速度跟不上,形成客戶端同步等待響應。

這類訂單隊列,我下面會放出 Golang 實現的版本代碼。

總結

對比上面兩種常見的訂單模型,若是從用戶體驗的角度去優先考慮,第一種不須要用戶等待持久化處理結果的是明顯優於第二種的。若是技術團隊完善,且技術過硬,也應該考慮第一種的實現方式。

若是僅僅想要達到寧願用戶等待到超時也不肯意存儲層服務被沖垮,那麼有限考慮第二種。

實現隊列的選擇

在這裏,咱們進一步細分一下,實現隊列模塊的功能有哪些選擇。

相信不少後端開發經驗比較老道的同志已經想到了,使用現有的中間件,好比知名的 RedisRocketMQ,以及 Kafka 等,它們都是一種選擇。

此外地,咱們還能夠直接編寫代碼,在當前的服務系統中實現一個消息隊列來達到目的,下面我用圖來分類下隊列類型。

不一樣的隊列實現方式,能直接致使不一樣的功能,也有不一樣的優缺點:

一級緩存優勢:

  1. 一級緩存,最快。無需連接,直接從內存層獲取;
  2. 若是不考慮持久化和集羣,那麼它實現簡單。

一級緩存缺點:

  1. 若是考慮持久化和集羣,那麼它實現比較複雜。
  2. 不考慮持久化狀況下,若是服務器斷電或其它緣由致使服務中斷,那麼排隊中的訂單信息將丟失

中間件的優勢:

  1. 軟件成熟,通常出名的消息中間件都是通過實踐使用的,文檔豐富;
  2. 支持多種持久化的策略,好比 Redis 有增量持久化,能最大程度減小因不可預料的崩潰致使訂單信息丟失;
  3. 支持集羣,主從同步,這對於分佈式系統來講,是必不可少的要求。

中間件的缺點:

  1. 分佈式部署時,須要創建連接通信,致使讀寫操做須要走網絡通信。

解答

回到第一種訂單模型中:

問題1:

若是訂單存在第三方支付狀況,① 和 ② 的一致性如何保證?

首先咱們看下,不一致性的時候,會產生什麼結果:

  1. ① 失敗,用戶由於網絡緣由或返回其它頁面,不能獲取結果。而 ② 成功,那麼最終該訂單的狀態是待支付。用戶進入到我的訂單中心完成訂單支付便可;
  2. ① 和 ② 都失敗,那麼下單失敗;
  3. ① 成功,② 失敗,此時用戶在響應頁面完成了支付動做,用戶查看訂單信息爲空白。

上述的狀況,明顯地,只有 3 是須要恢復訂單信息的,應對的方案有:

  • 當服務端支付回調接口被第三方支付平臺訪問時,沒法找到對應的訂單信息。那麼先將這類支付了卻沒訂單信息的數據存儲起來先,好比存儲到表A。同時啓動一個定時任務B專門遍歷表A,而後去訂單列表尋找是否已經有了對應的訂單信息,有則更新,沒則繼續,或跟隨制定的檢測策略走。
  • 當 ② 是因爲服務端的非崩潰性緣由而致使失敗時:
    • 失敗的時候同時將原始訂單數據從新插入到隊列頭部,等待下一次的從新持久化處理。
  • 當 ② 因服務端的崩潰性緣由而致使失敗時:
    • 定時任務B在進行了屢次檢測無果後,那麼根據第三方支付平臺在回調時候傳遞過來的訂單附屬信息對訂單進行恢復。
  • 整個過程訂單恢復的過程,用戶查看訂單信息爲空白。
  • 定時任務B 所在服務最好和回調連接 notifyUrl 所在的接口服務一致,這樣能保證當 B 掛掉的時候,回調服務也跟隨掛掉,而後第三方支付平臺在調用回調失敗的狀況下,他們會有重試邏輯,依賴這個,在回調服務重啓時,能夠完成訂單信息恢復。

問題2:

若是訂單存在第三方支付狀況,① 完成了支付,且三方支付平臺回調了 notifyUrl,而此時 ② 還在排隊等待處理,這種狀況又如何處理?

應對的方案參考 問題1定時任務B 檢測修改機制。

第二種隊列的 Go 版本例子代碼

定義一些常量

const (
	QueueOrderKey   = "order_queue"     
	QueueBufferSize = 1024              // 請求隊列大小
	QueueHandleTime = time.Second * 7   // 單個 mission 超時時間
)
複製代碼

定義出入隊接口,方便多種實現

// 定義出入隊接口,方便多種實現
type IQueue interface {
	Push(key string,data []byte) error
	Pop(key string) ([]byte,error)
}
複製代碼

定義請求與響應實體

// 定義請求與響應實體
type QueueTimeoutResp struct {
	Timeout  bool  // 超時標誌位
	Response chan interface{}
}
type QueueRequest struct {
	ReqId  		string  `json:"req_id"`  // 單次請求 id
	Order       *model.OrderCombine `json:"order"` // 訂單信息 bean
	AccessTime 	int64 	`json:"access_time"` // 請求時間
	ResponseChan *QueueTimeoutResp `json:"-"`
}
複製代碼

定義隊列實體

// 定義隊列實體
type Queue struct {
	mapLock sync.Mutex
	RequestChan  chan *QueueRequest // 緩存管道,裝載請求
	RequestMap   map[string]*QueueTimeoutResp 
	Queue IQueue
}
複製代碼

實例化隊列,接收接口參數

// 實例化隊列,接收接口參數
func NewQueue(queue IQueue) *Queue {
	return &Queue{
		mapLock:     sync.Mutex{},
		RequestChan: make(chan *QueueRequest, QueueBufferSize),
		RequestMap:  make(map[string]*QueueTimeoutResp, QueueBufferSize),
		Queue:       queue,
	}
}
複製代碼

接收請求

// 接收請求
func (q *Queue) AcceptRequest(req *QueueRequest) interface{} {
	if req.ResponseChan == nil {
		req.ResponseChan = &QueueTimeoutResp{
			Timeout:  false,
			Response: make(chan interface{},1),
		}
	}
	userKey := key(req)  // 惟一 key 生成函數
	req.ReqId = userKey
	q.mapLock.Lock()
	q.RequestMap[userKey] = req.ResponseChan // 內存層存儲對應的 req 的 resp 管道指針
	q.mapLock.Unlock()
	q.RequestChan <- req  // 接收請求
	log("userKey : ", userKey)
	ticker := time.NewTicker(QueueHandleTime) // 以超時時間 QueueHandleTime 啓動一個定時器
	defer func() {
		ticker.Stop() // 釋放定時器
		q.mapLock.Lock()
		delete(q.RequestMap,userKey)  // 當處理完一個 req,從 map 中移出
		q.mapLock.Unlock()
	}()

	select {
	case <-ticker.C:  // 超時
		req.ResponseChan.Timeout = true 
		Queue_TimeoutCounter++  // 輔助計數,int 類型
		log("timeout: ",userKey)
		return lghError.HandleTimeOut  // 返回超時錯誤的信息
	case result := <-req.ResponseChan.Response:  // req 被完整處理
		return result
	}
}
複製代碼

從請求管道中取出 req 放入到隊列容器中,該函數在 gorutine 中運行

// 從請求管道中取出 req 放入到隊列容器中,該函數在 gorutine 中運行
func (q *Queue) addToQueue() {
	for {
		req := <-q.RequestChan // 取出一個 req
		data, err := json.Marshal(req)
		if err != nil {
			log("redis queue parse req failed : ", err.Error())
			continue
		}
		if err = q.Queue.Push(QueueOrderKey, data);err != nil {  // push 入隊,這裏有時間消耗
			log("lpush req failed. Error : ", err.Error())
			continue
		}
		log("lpush req success. req time: ", req.AccessTime)
	}
}
複製代碼

取出 req 處理,該函數在 gorutine 中運行

// 取出 req 處理,該函數在 gorutine 中運行
func (q *Queue) readFromQueue() {
	for {
		data, err := q.Queue.Pop(QueueOrderKey) // pop 出隊,這裏也有時間消耗
		if err != nil {
			log("lpop failed. Error : ", err.Error())
			continue
		}
		if data == nil || len(data) == 0 {
			time.Sleep(time.Millisecond * 100) // 空數據的 req,停頓下再取
			continue
		}
		req := &QueueRequest{}
		if err = json.Unmarshal(data, req);err != nil {
			log("Lpop: json.Unmarshal failed. Error : ", err.Error())
			continue
		}
		userKey := req.ReqId
		q.mapLock.Lock()
		resultChan, ok := q.RequestMap[userKey] // 取出對應的 resp 管道指針
		q.mapLock.Unlock()
		if !ok {
			// 中間件重啓時,好比 redis 重啓而讀取舊 key,會進入這裏
			Queue_KeyNotFound ++ // 計數 int 類型
			log("key not found, rollback: ", userKey)
			continue
		}
		simulationTimeOutReq4(req) // 模擬出來任務的函數,入參爲 req 
		if resultChan.Timeout {
			// 處理期間,已經超時,這裏作能夠拓展回滾操做
			Queue_MissionTimeout ++
			log("handle mission timeout: ", userKey)
			continue
		}
		log("request result send to chan succeee, userKey : ", userKey)
		ret := util.GetCommonSuccess(req.AccessTime)
		resultChan.Response <- &ret // 輸入處理成功
	}
}
複製代碼

啓動

func (q *Queue) Start()  {
	go q.addToQueue()
	go q.readFromQueue()
}
複製代碼

運行例子

func test(){
    ...
    runtime.GOMAXPROCS(4)
    redisQueue := NewQueue(NewFastCacheQueue())
    redisQueue.Start()
    reqNumber := testReqNumber
    wg := sync.WaitGroup{}
    wg.Add(reqNumber)
    for i :=0;i<reqNumber;i++ {
    	go func(index int) {
    		combine := model.OrderCombine{}
    		ret := AcceptRequest(&QueueRequest{
    			UserId:       int64(index),
    			Order:        &combine,
    			AccessTime:   time.Now().Unix(),
    			ResponseChan: nil,
    		})
    		fmt.Println("ret: ------------- ",ret.String())
    		wg.Done()
    	}(i)
    }
    wg.Wait()
    time.Sleep(3*time.Second)
    fmt.Println("TimeoutCounter: ",Queue_TimeoutCounter,"KeyNotFound: ",Queue_KeyNotFound,"MissionTimeout: ",Queue_MissionTimeout)
}
複製代碼

最後上傳一張書籍圖片

相關文章
相關標籤/搜索