歡迎關注「Keegan小鋼」公衆號獲取更多文章git
撮合引擎開發:開篇github
撮合引擎開發:MVP版本redis
撮合引擎開發:對接黑箱bash
撮合引擎開發:流程的代碼實現數據結構
先來回顧下咱們撮合程序項目中關於中間件的目錄結構:分佈式
├── middleware # 中間件的包
│ ├── cache # 緩存包
│ │ └── cache.go # 緩存操做
│ ├── mq # 消息隊列包
│ │ └── mq.go # MQ操做
│ └── redis.go # 主要作Redis初始化操做
複製代碼
雖然如今只用到了 Redis 一箇中間件,但設計個 middleware 包,會方便之後擴展添加其餘中間件,如 Kafka 或 RocketMQ 等。函數
再將緩存和消息隊列分包,職責上就很分明,應用時也很明確。post
redis.go 就只是作初始化的鏈接,咱們來看看代碼:
package middleware
import (
"matching/log"
"github.com/go-redis/redis"
"github.com/spf13/viper"
)
var RedisClient *redis.Client
func Init() {
addr := viper.GetString("redis.addr")
RedisClient = redis.NewClient(&redis.Options{
Addr: addr,
Password: "", // no password set
DB: 0, // use default DB
})
_, err := RedisClient.Ping().Result()
if err != nil {
panic(err)
} else {
log.Printf("Connected to redis: %s", addr)
}
}
複製代碼
其中,viper 是前文說過的第三方配置庫,經過 viper.GetString("redis.addr") 從配置文件讀取出要鏈接的 Redis 的地址,以後就新建一個 Redis 客戶端並鏈接上 Redis 服務器了。
講數據結構設計時,咱們已經說過,使用緩存的目的主要有兩個:
還記得上一篇文章講 Dispatch 的實現時,有個判斷訂單是否存在的邏輯嗎?就是讀取緩存中是否已經存在該訂單,從而判別是否爲重複請求或無效請求。以及,還記得 process 包的初始化?就是從緩存中恢復數據的過程。
先了解下,咱們總共緩存了哪些數據:
開啓撮合的交易標的 symbol;
這些交易標的的最新價格;
全部有效的訂單請求,包括下單和撤單請求。
開啓撮合的交易標的 symbol 會有多個,且不能重複,那其實就能夠保存爲集合 set 類型。我將該 set 的 key 設計爲 matching:symbols,以後,每有一個 symbol 開啓撮合時,就能夠用 Redis 的 sadd 命令將該 symbol 添加進這個集合裏去了。而關閉撮合時,則需用 srem 命令將關閉撮合的 symbol 從集合中移除。讀取全部 symbol 則可用 smembers 命令操做。
程序裏對 symbol 的操做提供了三個函數,分別用來保存 symbol、移除 symbol 和獲取全部 symbol,如下是實現的代碼:
func SaveSymbol(symbol string) {
key := "matching:symbols"
RedisClient.SAdd(key, symbol)
}
func RemoveSymbol(symbol string) {
key := "matching:symbols"
RedisClient.SRem(key, symbol)
}
func GetSymbols() []string {
key := "matching:symbols"
return RedisClient.SMembers(key).Val()
}
複製代碼
交易標的的最新價格則是每一個 symbol 會有一個價格,且無需緩存歷史價格,那我就直接用字符串類型來保存價格,而每一個價格的 key 則包含有各自的 symbol,key 的格式設計爲 matching:price:{symbol},假如要保存的 symbol = 「BTCUSD」,那對應的 key 值就是 matching:price:BTCUSD,保存的 value 值就是 BTCUSD 的最新價格。
咱們也一樣提供了保存價格、獲取價格和刪除價格的三個函數,代碼以下:
func SavePrice(symbol string, price decimal.Decimal) {
key := "matching:price:" + symbol
RedisClient.Set(key, price.String(), 0)
}
func GetPrice(symbol string) decimal.Decimal {
key := "matching:price:" + symbol
priceStr := RedisClient.Get(key).Val()
result, err := decimal.NewFromString(priceStr)
if err != nil {
result = decimal.Zero
}
return result
}
func RemovePrice(symbol string) {
key := "matching:price:" + symbol
RedisClient.Del(key)
}
複製代碼
對訂單的緩存設計則沒那麼簡單了,須要知足兩點要求:
先說下第一點,爲何須要緩存訂單?且爲何下單和撤單請求都須要緩存?
先來解答第一個問題,咱們是在內存中撮合的,每一個交易標的引擎裏各自維護了一個交易委託帳本,程序運行時,這些帳本是直接保存在程序內存裏的。那若是程序退出了,這些帳本都被清空了。若是沒有緩存,那程序重啓後就沒法恢復帳本數據。要知足該需求,就須要緩存帳本里的全部委託單。
關於第二個問題,咱們來考慮這樣一個場景:假如訂單通道里有撤單請求在排隊,而程序並無對撤單請求作緩存,這時程序重啓了,那麼訂單通道里的全部訂單還沒被引擎接收處理以前就被清空了,撤單請求也就沒法恢復了。
所以,程序須要緩存好訂單,且下單和撤單都須要緩存。
再來看第二個要求,爲何要符合定序?咱們知道,訂單通道里的訂單是定序的,交易委託帳本里同價格的訂單也是按時間排序的,那緩存時若是不定序,程序重啓後就難以保證按原有的順序恢復訂單。
那具體要怎麼來設計這個訂單的緩存呢?個人方案是分兩類緩存,第一類保存每一個獨立的訂單請求,包括下單和撤單;第二類分交易標的保存對應 symbol 全部訂單請求的訂單 ID 和 action。
第一類,我設計的 Key 格式爲 matching:order:{symbol}:{orderId}:{action},symbol、orderId 和 action 則是對應訂單的三個變量值。好比,某訂單 symbol = 「BTCUSD」,orderId = 「12345」,action = 「cancel」,那該訂單保存到 Redis 的 Key 值就是 matching:order:BTCUSD:12345:cancel。該 Key 對應的 Value 則是保存整個訂單對象,能夠用 hash 類型存儲。
第二類,我設計的 Key 格式爲 matching:orderids:{symbol},Value 保存的是 sorted set 類型的數據,保存對應 symbol 的全部訂單請求,每條記錄保存的值爲 {orderId}:{action},而 score 值設爲對應訂單的 {timestamp}。用訂單時間做爲 score 就能夠保證定序了。還記得以前文章咱們將訂單時間的單位設爲 100 納秒,保證時間戳長度恰好爲 16 位嗎?這是由於,若是超過 16 位,那 score 將轉爲科學計數法表示,那將會致使數字失真。
根據這樣的設計,那保存訂單時的實現邏輯就如如下代碼所示:
func SaveOrder(order map[string]interface{}) {
symbol := order["symbol"].(string)
orderId := order["orderId"].(string)
timestamp := order["timestamp"].(float64)
action := order["action"].(string)
key := "matching:order:" + symbol + ":" + orderId + ":" + action
RedisClient.HMSet(key, order)
key = "matching:orderids:" + symbol
z := &redis.Z{
Score: timestamp,
Member: orderId + ":" + action,
}
RedisClient.ZAdd(key, z)
}
複製代碼
另外,還提供了 GetOrder()、UpdateOrder()、RemoveOrder()、OrderExist()、GetOrderIdsWithAction() 等函數。再給大夥看看 GetOrderIdsWithAction() 函數的實現:
func GetOrderIdsWithAction(symbol string) []string {
key := "matching:orderids:" + symbol
return RedisClient.ZRange(key, 0, -1).Val()
}
複製代碼
該函數獲得的結果是根據 score 值排好序的,這就是咱們想要的結果。理解了這個設計以後,再翻回去看看 process 包的初始化,你就會明白那些代碼的邏輯了。
咱們選擇了使用 Redis 的 Stream 數據結構來做爲 MQ 輸出,Stream 數據結構採用了相似 Kafka 的設計,應用起來很方便。但因爲 Redis 運行於內存的特性,相比 Kafka 快速不少,這也是我選擇它來做爲撮合程序的輸出 MQ 的主要緣由。
咱們只有兩類 MQ,撤單結果和成交記錄,發送消息的實現以下:
func SendCancelResult(symbol, orderId string, ok bool) {
values := map[string]interface{}{"orderId": orderId, "ok": ok}
a := &redis.XAddArgs{
Stream: "matching:cancelresults:" + symbol,
MaxLenApprox: 1000,
Values: values,
}
RedisClient.XAdd(a)
}
func SendTrade(symbol string, trade map[string]interface{}) {
a := &redis.XAddArgs{
Stream: "matching:trades:" + symbol,
MaxLenApprox: 1000,
Values: trade,
}
RedisClient.XAdd(a)
}
複製代碼
其中,matching:cancelresults:{symbol} 就是撤單結果的 MQ 所屬的 Key,matching:trades:{symbol} 則是成交記錄的 MQ 所屬的 Key。能夠看到,咱們還根據不一樣 symbol 分不一樣 MQ,這樣還方便下游服務能夠根據須要實現分佈式訂閱不一樣 symbol 的 MQ。
本小節講解了緩存和 MQ 的設計與實現,理解了這部分的設計以後,對整個撮合引擎的核心設計也基本能理解了。
最後,依然留幾個思考題:是否能夠不用緩存?若是不用緩存能夠如何解決去重和數據恢復的問題?
掃描如下二維碼便可關注公衆號(公衆號名稱:Keegan小鋼)