撮合引擎開發:緩存和MQ

歡迎關注「Keegan小鋼」公衆號獲取更多文章git


撮合引擎開發:開篇github

撮合引擎開發:MVP版本redis

撮合引擎開發:數據結構設計緩存

撮合引擎開發:對接黑箱bash

撮合引擎開發:解密黑箱流程服務器

撮合引擎開發:流程的代碼實現數據結構


中間件

先來回顧下咱們撮合程序項目中關於中間件的目錄結構:分佈式

├── middleware               # 中間件的包
│   ├── cache                # 緩存包
│   │   └── cache.go         # 緩存操做
│   ├── mq                   # 消息隊列包
│   │   └── mq.go            # MQ操做
│   └── redis.go             # 主要作Redis初始化操做
複製代碼

雖然如今只用到了 Redis 一箇中間件,但設計個 middleware 包,會方便之後擴展添加其餘中間件,如 Kafka 或 RocketMQ 等。函數

再將緩存和消息隊列分包,職責上就很分明,應用時也很明確。post

redis.go 就只是作初始化的鏈接,咱們來看看代碼:

package middleware

import (
	"matching/log"

	"github.com/go-redis/redis"
	"github.com/spf13/viper"
)

var RedisClient *redis.Client

func Init() {
	addr := viper.GetString("redis.addr")
	RedisClient = redis.NewClient(&redis.Options{
		Addr:     addr,
		Password: "", // no password set
		DB:       0,  // use default DB
	})

	_, err := RedisClient.Ping().Result()
	if err != nil {
		panic(err)
	} else {
		log.Printf("Connected to redis: %s", addr)
	}
}
複製代碼

其中,viper 是前文說過的第三方配置庫,經過 viper.GetString("redis.addr") 從配置文件讀取出要鏈接的 Redis 的地址,以後就新建一個 Redis 客戶端並鏈接上 Redis 服務器了。

緩存的設計

講數據結構設計時,咱們已經說過,使用緩存的目的主要有兩個:

  1. 請求去重,避免重複提交相同訂單;
  2. 恢復數據,即程序重啓後能恢復全部數據。

還記得上一篇文章講 Dispatch 的實現時,有個判斷訂單是否存在的邏輯嗎?就是讀取緩存中是否已經存在該訂單,從而判別是否爲重複請求或無效請求。以及,還記得 process 包的初始化?就是從緩存中恢復數據的過程。

先了解下,咱們總共緩存了哪些數據:

  • 開啓撮合的交易標的 symbol;

  • 這些交易標的的最新價格;

  • 全部有效的訂單請求,包括下單和撤單請求。

1. 緩存symbol

開啓撮合的交易標的 symbol 會有多個,且不能重複,那其實就能夠保存爲集合 set 類型。我將該 set 的 key 設計爲 matching:symbols,以後,每有一個 symbol 開啓撮合時,就能夠用 Redis 的 sadd 命令將該 symbol 添加進這個集合裏去了。而關閉撮合時,則需用 srem 命令將關閉撮合的 symbol 從集合中移除。讀取全部 symbol 則可用 smembers 命令操做。

程序裏對 symbol 的操做提供了三個函數,分別用來保存 symbol、移除 symbol 和獲取全部 symbol,如下是實現的代碼:

func SaveSymbol(symbol string) {
	key := "matching:symbols"
	RedisClient.SAdd(key, symbol)
}

func RemoveSymbol(symbol string) {
	key := "matching:symbols"
	RedisClient.SRem(key, symbol)
}

func GetSymbols() []string {
	key := "matching:symbols"
	return RedisClient.SMembers(key).Val()
}
複製代碼

2. 緩存價格

交易標的的最新價格則是每一個 symbol 會有一個價格,且無需緩存歷史價格,那我就直接用字符串類型來保存價格,而每一個價格的 key 則包含有各自的 symbol,key 的格式設計爲 matching:price:{symbol},假如要保存的 symbol = 「BTCUSD」,那對應的 key 值就是 matching:price:BTCUSD,保存的 value 值就是 BTCUSD 的最新價格。

咱們也一樣提供了保存價格、獲取價格和刪除價格的三個函數,代碼以下:

func SavePrice(symbol string, price decimal.Decimal) {
	key := "matching:price:" + symbol
	RedisClient.Set(key, price.String(), 0)
}

func GetPrice(symbol string) decimal.Decimal {
	key := "matching:price:" + symbol
	priceStr := RedisClient.Get(key).Val()
	result, err := decimal.NewFromString(priceStr)
	if err != nil {
		result = decimal.Zero
	}
	return result
}

func RemovePrice(symbol string) {
	key := "matching:price:" + symbol
	RedisClient.Del(key)
}
複製代碼

3. 緩存訂單

對訂單的緩存設計則沒那麼簡單了,須要知足兩點要求:

  1. 既能緩存下單請求,也能緩存撤單請求;
  2. 訂單要符合定序要求。

先說下第一點,爲何須要緩存訂單?且爲何下單和撤單請求都須要緩存?

先來解答第一個問題,咱們是在內存中撮合的,每一個交易標的引擎裏各自維護了一個交易委託帳本,程序運行時,這些帳本是直接保存在程序內存裏的。那若是程序退出了,這些帳本都被清空了。若是沒有緩存,那程序重啓後就沒法恢復帳本數據。要知足該需求,就須要緩存帳本里的全部委託單。

關於第二個問題,咱們來考慮這樣一個場景:假如訂單通道里有撤單請求在排隊,而程序並無對撤單請求作緩存,這時程序重啓了,那麼訂單通道里的全部訂單還沒被引擎接收處理以前就被清空了,撤單請求也就沒法恢復了。

所以,程序須要緩存好訂單,且下單和撤單都須要緩存。

再來看第二個要求,爲何要符合定序?咱們知道,訂單通道里的訂單是定序的,交易委託帳本里同價格的訂單也是按時間排序的,那緩存時若是不定序,程序重啓後就難以保證按原有的順序恢復訂單。

那具體要怎麼來設計這個訂單的緩存呢?個人方案是分兩類緩存,第一類保存每一個獨立的訂單請求,包括下單和撤單;第二類分交易標的保存對應 symbol 全部訂單請求的訂單 ID 和 action。

第一類,我設計的 Key 格式爲 matching:order:{symbol}:{orderId}:{action},symbol、orderId 和 action 則是對應訂單的三個變量值。好比,某訂單 symbol = 「BTCUSD」,orderId = 「12345」,action = 「cancel」,那該訂單保存到 Redis 的 Key 值就是 matching:order:BTCUSD:12345:cancel。該 Key 對應的 Value 則是保存整個訂單對象,能夠用 hash 類型存儲。

第二類,我設計的 Key 格式爲 matching:orderids:{symbol},Value 保存的是 sorted set 類型的數據,保存對應 symbol 的全部訂單請求,每條記錄保存的值爲 {orderId}:{action},而 score 值設爲對應訂單的 {timestamp}。用訂單時間做爲 score 就能夠保證定序了。還記得以前文章咱們將訂單時間的單位設爲 100 納秒,保證時間戳長度恰好爲 16 位嗎?這是由於,若是超過 16 位,那 score 將轉爲科學計數法表示,那將會致使數字失真。

根據這樣的設計,那保存訂單時的實現邏輯就如如下代碼所示:

func SaveOrder(order map[string]interface{}) {
	symbol := order["symbol"].(string)
	orderId := order["orderId"].(string)
	timestamp := order["timestamp"].(float64)
	action := order["action"].(string)

	key := "matching:order:" + symbol + ":" + orderId + ":" + action
	RedisClient.HMSet(key, order)

	key = "matching:orderids:" + symbol
	z := &redis.Z{
		Score:  timestamp,
		Member: orderId + ":" + action,
	}
	RedisClient.ZAdd(key, z)
}
複製代碼

另外,還提供了 GetOrder()、UpdateOrder()、RemoveOrder()、OrderExist()、GetOrderIdsWithAction() 等函數。再給大夥看看 GetOrderIdsWithAction() 函數的實現:

func GetOrderIdsWithAction(symbol string) []string {
	key := "matching:orderids:" + symbol
	return RedisClient.ZRange(key, 0, -1).Val()
}
複製代碼

該函數獲得的結果是根據 score 值排好序的,這就是咱們想要的結果。理解了這個設計以後,再翻回去看看 process 包的初始化,你就會明白那些代碼的邏輯了。

MQ的設計

咱們選擇了使用 Redis 的 Stream 數據結構來做爲 MQ 輸出,Stream 數據結構採用了相似 Kafka 的設計,應用起來很方便。但因爲 Redis 運行於內存的特性,相比 Kafka 快速不少,這也是我選擇它來做爲撮合程序的輸出 MQ 的主要緣由。

咱們只有兩類 MQ,撤單結果和成交記錄,發送消息的實現以下:

func SendCancelResult(symbol, orderId string, ok bool) {
	values := map[string]interface{}{"orderId": orderId, "ok": ok}
	a := &redis.XAddArgs{
		Stream:       "matching:cancelresults:" + symbol,
		MaxLenApprox: 1000,
		Values:       values,
	}
	RedisClient.XAdd(a)
}

func SendTrade(symbol string, trade map[string]interface{}) {
	a := &redis.XAddArgs{
		Stream:       "matching:trades:" + symbol,
		MaxLenApprox: 1000,
		Values:       trade,
	}
	RedisClient.XAdd(a)
}
複製代碼

其中,matching:cancelresults:{symbol} 就是撤單結果的 MQ 所屬的 Key,matching:trades:{symbol} 則是成交記錄的 MQ 所屬的 Key。能夠看到,咱們還根據不一樣 symbol 分不一樣 MQ,這樣還方便下游服務能夠根據須要實現分佈式訂閱不一樣 symbol 的 MQ。

小結

本小節講解了緩存和 MQ 的設計與實現,理解了這部分的設計以後,對整個撮合引擎的核心設計也基本能理解了。

最後,依然留幾個思考題:是否能夠不用緩存?若是不用緩存能夠如何解決去重和數據恢復的問題?

做者的我的博客

掃描如下二維碼便可關注公衆號(公衆號名稱:Keegan小鋼)

相關文章
相關標籤/搜索