歡迎關注「Keegan小鋼」公衆號獲取更多文章git
價值超5萬的撮合引擎:開篇github
價值超5萬的撮合引擎:MVP版本redis
撮合引擎開發:對接黑箱服務器
撮合引擎開發:流程的代碼實現數據結構
中間件
先來回顧下咱們撮合程序項目中關於中間件的目錄結構:分佈式
├── middleware # 中間件的包│ ├── cache # 緩存包│ │ └── cache.go # 緩存操做│ ├── mq # 消息隊列包│ │ └── mq.go # MQ操做│ └── redis.go # 主要作Redis初始化操做
雖然如今只用到了 Redis 一箇中間件,但設計個 middleware 包,會方便之後擴展添加其餘中間件,如 Kafka 或 RocketMQ 等。函數
再將緩存和消息隊列分包,職責上就很分明,應用時也很明確。flex
redis.go 就只是作初始化的鏈接,咱們來看看代碼:
package middleware
import ( "matching/log"
"github.com/go-redis/redis" "github.com/spf13/viper")
var RedisClient *redis.Client
func Init() { addr := viper.GetString("redis.addr") RedisClient = redis.NewClient(&redis.Options{ Addr: addr, Password: "", // no password set DB: 0, // use default DB })
_, err := RedisClient.Ping().Result() if err != nil { panic(err) } else { log.Printf("Connected to redis: %s", addr) }}
其中,viper 是前文說過的第三方配置庫,經過 viper.GetString("redis.addr") 從配置文件讀取出要鏈接的 Redis 的地址,以後就新建一個 Redis 客戶端並鏈接上 Redis 服務器了。
緩存的設計
講數據結構設計時,咱們已經說過,使用緩存的目的主要有兩個:
1.請求去重,避免重複提交相同訂單;2.恢復數據,即程序重啓後能恢復全部數據。
還記得上一篇文章講 Dispatch 的實現時,有個判斷訂單是否存在的邏輯嗎?就是讀取緩存中是否已經存在該訂單,從而判別是否爲重複請求或無效請求。以及,還記得 process 包的初始化?就是從緩存中恢復數據的過程。
先了解下,咱們總共緩存了哪些數據:
•
開啓撮合的交易標的 symbol;
•
這些交易標的的最新價格;
•全部有效的訂單請求,包括下單和撤單請求。
1. 緩存symbol
開啓撮合的交易標的 symbol 會有多個,且不能重複,那其實就能夠保存爲集合 set 類型。我將該 set 的 key 設計爲 matching:symbols,以後,每有一個 symbol 開啓撮合時,就能夠用 Redis 的 sadd 命令將該 symbol 添加進這個集合裏去了。而關閉撮合時,則需用 srem 命令將關閉撮合的 symbol 從集合中移除。讀取全部 symbol 則可用 smembers 命令操做。
程序裏對 symbol 的操做提供了三個函數,分別用來保存 symbol、移除 symbol 和獲取全部 symbol,如下是實現的代碼:
func SaveSymbol(symbol string) { key := "matching:symbols" RedisClient.SAdd(key, symbol)}
func RemoveSymbol(symbol string) { key := "matching:symbols" RedisClient.SRem(key, symbol)}
func GetSymbols() []string { key := "matching:symbols" return RedisClient.SMembers(key).Val()}
2. 緩存價格
交易標的的最新價格則是每一個 symbol 會有一個價格,且無需緩存歷史價格,那我就直接用字符串類型來保存價格,而每一個價格的 key 則包含有各自的 symbol,key 的格式設計爲 matching:price:{symbol},假如要保存的 symbol = 「BTCUSD」,那對應的 key 值就是 matching:price:BTCUSD,保存的 value 值就是 BTCUSD 的最新價格。
咱們也一樣提供了保存價格、獲取價格和刪除價格的三個函數,代碼以下:
func SavePrice(symbol string, price decimal.Decimal) { key := "matching:price:" + symbol RedisClient.Set(key, price.String(), 0)}
func GetPrice(symbol string) decimal.Decimal { key := "matching:price:" + symbol priceStr := RedisClient.Get(key).Val() result, err := decimal.NewFromString(priceStr) if err != nil { result = decimal.Zero } return result}
func RemovePrice(symbol string) { key := "matching:price:" + symbol RedisClient.Del(key)}
3. 緩存訂單
對訂單的緩存設計則沒那麼簡單了,須要知足兩點要求:
1.既能緩存下單請求,也能緩存撤單請求;2.訂單要符合定序要求。
先說下第一點,爲何須要緩存訂單?且爲何下單和撤單請求都須要緩存?
先來解答第一個問題,咱們是在內存中撮合的,每一個交易標的引擎裏各自維護了一個交易委託帳本,程序運行時,這些帳本是直接保存在程序內存裏的。那若是程序退出了,這些帳本都被清空了。若是沒有緩存,那程序重啓後就沒法恢復帳本數據。要知足該需求,就須要緩存帳本里的全部委託單。
關於第二個問題,咱們來考慮這樣一個場景:假如訂單通道里有撤單請求在排隊,而程序並無對撤單請求作緩存,這時程序重啓了,那麼訂單通道里的全部訂單還沒被引擎接收處理以前就被清空了,撤單請求也就沒法恢復了。
所以,程序須要緩存好訂單,且下單和撤單都須要緩存。
再來看第二個要求,爲何要符合定序?咱們知道,訂單通道里的訂單是定序的,交易委託帳本里同價格的訂單也是按時間排序的,那緩存時若是不定序,程序重啓後就難以保證按原有的順序恢復訂單。
那具體要怎麼來設計這個訂單的緩存呢?個人方案是分兩類緩存,第一類保存每一個獨立的訂單請求,包括下單和撤單;第二類分交易標的保存對應 symbol 全部訂單請求的訂單 ID 和 action。
第一類,我設計的 Key 格式爲 matching:order:{symbol}:{orderId}:{action},symbol、orderId 和 action 則是對應訂單的三個變量值。好比,某訂單 symbol = 「BTCUSD」,orderId = 「12345」,action = 「cancel」,那該訂單保存到 Redis 的 Key 值就是 matching:order:BTCUSD:12345:cancel。該 Key 對應的 Value 則是保存整個訂單對象,能夠用 hash 類型存儲。
第二類,我設計的 Key 格式爲 matching:orderids:{symbol},Value 保存的是 sorted set 類型的數據,保存對應 symbol 的全部訂單請求,每條記錄保存的值爲 {orderId}:{action},而 score 值設爲對應訂單的 {timestamp}。用訂單時間做爲 score 就能夠保證定序了。還記得以前文章咱們將訂單時間的單位設爲 100 納秒,保證時間戳長度恰好爲 16 位嗎?這是由於,若是超過 16 位,那 score 將轉爲科學計數法表示,那將會致使數字失真。
根據這樣的設計,那保存訂單時的實現邏輯就如如下代碼所示:
func SaveOrder(order map[string]interface{}) { symbol := order["symbol"].(string) orderId := order["orderId"].(string) timestamp := order["timestamp"].(float64) action := order["action"].(string)
key := "matching:order:" + symbol + ":" + orderId + ":" + action RedisClient.HMSet(key, order)
key = "matching:orderids:" + symbol z := &redis.Z{ Score: timestamp, Member: orderId + ":" + action, } RedisClient.ZAdd(key, z)}
另外,還提供了 GetOrder()、UpdateOrder()、RemoveOrder()、OrderExist()、GetOrderIdsWithAction() 等函數。再給大夥看看 GetOrderIdsWithAction() 函數的實現:
func GetOrderIdsWithAction(symbol string) []string { key := "matching:orderids:" + symbol return RedisClient.ZRange(key, 0, -1).Val()}
該函數獲得的結果是根據 score 值排好序的,這就是咱們想要的結果。理解了這個設計以後,再翻回去看看 process 包的初始化,你就會明白那些代碼的邏輯了。
MQ的設計
咱們選擇了使用 Redis 的 Stream 數據結構來做爲 MQ 輸出,Stream 數據結構採用了相似 Kafka 的設計,應用起來很方便。但因爲 Redis 運行於內存的特性,相比 Kafka 快速不少,這也是我選擇它來做爲撮合程序的輸出 MQ 的主要緣由。
咱們只有兩類 MQ,撤單結果和成交記錄,發送消息的實現以下:
func SendCancelResult(symbol, orderId string, ok bool) { values := map[string]interface{}{"orderId": orderId, "ok": ok} a := &redis.XAddArgs{ Stream: "matching:cancelresults:" + symbol, MaxLenApprox: 1000, Values: values, } RedisClient.XAdd(a)}
func SendTrade(symbol string, trade map[string]interface{}) { a := &redis.XAddArgs{ Stream: "matching:trades:" + symbol, MaxLenApprox: 1000, Values: trade, } RedisClient.XAdd(a)}
其中,matching:cancelresults:{symbol} 就是撤單結果的 MQ 所屬的 Key,matching:trades:{symbol} 則是成交記錄的 MQ 所屬的 Key。能夠看到,咱們還根據不一樣 symbol 分不一樣 MQ,這樣還方便下游服務能夠根據須要實現分佈式訂閱不一樣 symbol 的 MQ。
小結
本小節講解了緩存和 MQ 的設計與實現,理解了這部分的設計以後,對整個撮合引擎的核心設計也基本能理解了。
最後,依然留幾個思考題:是否能夠不用緩存?若是不用緩存能夠如何解決去重和數據恢復的問題?
本文分享自微信公衆號 - Keegan小鋼(keeganlee_me)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。