歡迎關注「Keegan小鋼」公衆號獲取更多文章git
撮合引擎開發:開篇github
撮合引擎開發:MVP版本web
撮合引擎開發:數據結構設計redis
撮合引擎開發:對接黑箱spring
撮合引擎開發:解密黑箱流程json
咱們要開始聊代碼實現邏輯了,若是不記得以前講的目錄結構,請回去翻看前文。聊代碼實現的第一步天然從程序入口開始,核心就兩個函數:init() 和 main(),其代碼以下:bash
package main
... //other codes
func init() {
initViper()
initLog()
engine.Init()
middleware.Init()
process.Init()
}
func main() {
mux := http.NewServeMux()
mux.HandleFunc("/openMatching", handler.OpenMatching)
mux.HandleFunc("/closeMatching", handler.CloseMatching)
mux.HandleFunc("/handleOrder", handler.HandleOrder)
log.Printf("HTTP ListenAndServe at port %s", viper.GetString("server.port"))
if err := http.ListenAndServe(viper.GetString("server.port"), mux); err != nil {
panic(err)
}
}
複製代碼
init() 函數作了一些初始化的操做,我來簡單介紹這幾個初始化函數:數據結構
viper 和 redis 的初始化都是參照官方 demo 寫的,這裏就不展開說明了。log 後續再單獨講。engine 包和 process 包的初始化就須要好好講講。app
其中,引擎包的初始化雖然很是簡單,但很關鍵,其代碼寫在 engine/init.go 文件中,完整代碼以下:
package engine
var ChanMap map[string]chan Order
func Init() {
ChanMap = make(map[string]chan Order)
}
複製代碼
這個保存通道的 map,其 Key 是各交易標的的 symbol,便是說每一個交易標的各有一個訂單通道,這些訂單通道將做爲每一個交易標的的定序隊列。
process 包的初始化則以下:
func Init() {
symbols := cache.GetSymbols()
for _, symbol := range symbols {
price := cache.GetPrice(symbol)
NewEngine(symbol, price)
orderIds := cache.GetOrderIdsWithAction(symbol)
for _, orderId := range orderIds {
mapOrder := cache.GetOrder(symbol, orderId)
order := engine.Order{}
order.FromMap(mapOrder)
engine.ChanMap[order.Symbol] <- order
}
}
}
複製代碼
簡單講解下實現邏輯:
若是對這裏面有些設計邏輯還不太明白的話,也不要緊,後面講到對應模塊時會再詳細說明。
main() 函數裏,定義了咱們以前所說的三個接口,分別交由對應的 handler 去處理具體的請求,以後就啓動 http 服務了。
由於只有幾個接口,並且也很簡單,所以,並無引入第三方 web 框架,handler 都是用原生實現的。先來看看 OpenMatching 的完整實現:
package handler
import (
"encoding/json"
"io/ioutil"
"net/http"
"strings"
"matching/errcode"
"matching/process"
"github.com/shopspring/decimal"
)
type openMatchingParams struct {
Symbol string `json:"symbol"`
Price decimal.Decimal `json:"price"`
}
func OpenMatching(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
var params openMatchingParams
if err := json.Unmarshal(body, ¶ms); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
if strings.TrimSpace(params.Symbol) == "" {
w.Write(errcode.BlankSymbol.ToJson())
return
}
if params.Price.IsNegative() {
w.Write(errcode.InvalidPrice.ToJson())
return
}
if e := process.NewEngine(params.Symbol, params.Price); !e.IsOK() {
w.Write(e.ToJson())
return
}
w.Write(errcode.OK.ToJson())
}
複製代碼
邏輯很是簡單,先判斷是否爲 POST 請求,再讀取 body 裏的數據並轉爲結構體對象,接着對參數作個簡單的檢查,最後就調用 process.NewEngine(symbol, price) 進入下一步的業務邏輯,若是結果返回是 OK,也返回 OK 做爲請求的響應。
另外,用到了第三方的 decimal.Decimal 類型用來表示價格,整個程序都統一用 decimal 來表示浮點數和作精確計算。
CloseMatching 和 HandleOrder 的實現邏輯也是同理,CloseMatching 最後會調用 process.CloseEngine(symbol) 函數進入下一步的處理,HandleOrder 最後則調用 process.Dispatch(order) 進入下一步。不過,Order 結構體是定義在 engine 包的,其結構以下:
type Order struct {
Action enum.OrderAction `json:"action"`
Symbol string `json:"symbol"`
OrderId string `json:"orderId"`
Side enum.OrderSide `json:"side"`
Type enum.OrderType `json:"type"`
Amount decimal.Decimal `json:"amount"`
Price decimal.Decimal `json:"price"`
Timestamp int64 `json:"timestamp"`
}
複製代碼
能夠看到,其中的字段,除了有 Decimal 類型,還有 enum 包的幾個類型,這幾個實際上是咱們程序中本身定義的枚舉類型。Golang 語言自己並無提供和其餘語言同樣的 enum 關鍵字來定義枚舉類型,因此通常採用類型定義+常量來模擬枚舉類型,以 enum.OrderAction 爲例:
type OrderAction string
const (
ActionCreate OrderAction = "create"
ActionCancel OrderAction = "cancel"
)
複製代碼
其餘幾個枚舉類型也是這樣定義的。
另外,爲了方便轉爲字符串和檢驗參數是否有效,程序中還爲每一個枚舉類型分別提供了兩個函數,仍是以 OrderAction 爲例:
func (o OrderAction) String() string {
switch o {
case ActionCreate:
return "create"
case ActionCancel:
return "cancel"
default:
return "unknown"
}
}
func (o OrderAction) Valid() bool {
if o.String() == "unknown" {
return false
}
return true
}
複製代碼
其餘幾個枚舉類型也都定義了相似的兩個函數,就再也不貼代碼了。
來回顧下 process 包有哪些文件:
└── process #
├── close_engine.go # 關閉引擎
├── dispatch.go # 分發訂單
├── init.go # 初始化
└── new_engine.go # 啓動新引擎
複製代碼
init.go 就一個初始化函數,上文已經講了。其餘三個文件分別定義了上文三個 handler 對應的下一步邏輯實現。
先來看看 new_engine.go:
package process
import (
"matching/engine"
"matching/errcode"
"matching/middleware/cache"
"github.com/shopspring/decimal"
)
func NewEngine(symbol string, price decimal.Decimal) *errcode.Errcode {
if engine.ChanMap[symbol] != nil {
return errcode.EngineExist
}
engine.ChanMap[symbol] = make(chan engine.Order, 100)
go engine.Run(symbol, price)
cache.SaveSymbol(symbol)
cache.SavePrice(symbol, price)
return errcode.OK
}
複製代碼
邏輯也是比較簡單的,第一步先判斷 ChanMap[symbol] 是否爲空,該 ChanMap 就是上文所說的引擎包初始化時用來保存訂單通道的 map。若是 ChanMap[symbol] 不爲空,說明該 symbol 的撮合引擎已經啓動過了,那就返回錯誤。若是爲空,那就初始化這個 symbol 的通道,從代碼可知,ChanMap[symbol] 初始化爲一個緩衝大小爲 100 的訂單通道。
接着,就調用 engine.Run() 啓動一個 goroutine 了,這行代碼即表示用 goroutine 的方式啓動指定 symbol 的撮合引擎了。
而後,就將 symbol 和 price 都緩存起來了。
最後,返回 OK,搞定。
接着,來看看 Dispatch 的實現又是怎樣的:
func Dispatch(order engine.Order) *errcode.Errcode {
if engine.ChanMap[order.Symbol] == nil {
return errcode.EngineNotFound
}
if order.Action == enum.ActionCreate {
if cache.OrderExist(order.Symbol, order.OrderId, order.Action.String()) {
return errcode.OrderExist
}
} else {
if !cache.OrderExist(order.Symbol, order.OrderId, enum.ActionCreate.String()) {
return errcode.OrderNotFound
}
}
order.Timestamp = time.Now().UnixNano() / 1e3
cache.SaveOrder(order.ToMap())
engine.ChanMap[order.Symbol] <- order
return errcode.OK
}
複製代碼
第一步,判斷 ChanMap[order.Symbol] 是否爲空,若是爲空,表示引擎沒開啓,那就沒法處理訂單。
第二步,判斷訂單是否存在。若是是 create 訂單,那緩存中就不該該查到訂單,不然說明是重複請求。若是是 cancel 訂單,那緩存中若是也查不到訂單,那說明該訂單已經所有成交或已經成功撤單過了。
第三步,將訂單時間設爲當前時間,時間單位是 100 納秒,這能夠保證時間戳長度恰好爲 16 位,保存到 Redis 裏就不會有精度失真的問題。這點後續文章講到 Redis 詳細設計時再說。
第四步,將訂單緩存。
第五步,將訂單傳入對應的訂單通道,對應引擎會從該通道中獲取該訂單進行處理。這一步就實現了訂單的分發。
第六步,返回 OK。
關閉引擎的實現就很是簡單了,請看代碼:
func CloseEngine(symbol string) *errcode.Errcode {
if engine.ChanMap[symbol] == nil {
return errcode.EngineNotFound
}
close(engine.ChanMap[symbol])
return errcode.OK
}
複製代碼
核心代碼就一行,將對應 symbol 的訂單通道關閉。後續的處理實際上是在引擎裏完成的,待會咱們再結合引擎裏的代碼來說解這個設計。
交易引擎 goroutine 的啓動入口就是 engine.Run() 函數,來看看其代碼實現:
func Run(symbol string, price decimal.Decimal) {
lastTradePrice := price
book := &orderBook{}
book.init()
log.Info("engine %s is running", symbol)
for {
order, ok := <-ChanMap[symbol]
if !ok {
log.Info("engine %s is closed", symbol)
delete(ChanMap, symbol)
cache.Clear(symbol)
return
}
log.Info("engine %s receive an order: %s", symbol, order.ToJson())
switch order.Action {
case enum.ActionCreate:
dealCreate(&order, book, &lastTradePrice)
case enum.ActionCancel:
dealCancel(&order, book)
}
}
}
複製代碼
第一步,先定義和初始化了一個 book 變量,該變量就是用來保存整個交易委託帳本。
接着,就是一個 for 循環了,for 循環裏的第一行就是從對應 symbol 的訂單通道里讀取出一個訂單,讀取到訂單時,order 變量就會有值,且 ok 變量爲 true。若是通道里暫時沒有訂單,那就會阻塞在這行代碼,直到從通道中獲取到訂單或通道已關閉的消息。
當通道被關閉以後,最後,從通道中讀取到的 ok 變量則爲 false,固然,在這以前,會先依序讀取完通道里剩下的訂單。當 ok 爲 false 時,引擎裏會執行兩步操做:一是從 ChanMap 中刪除該 symbol 對應的記錄,二是清空該 symbol 對應的緩存數據。最後用 return 來退出 for 循環,這樣,整個 Run() 函數就結束退出了,意味着該引擎也真正關閉了。
當每讀取到一個訂單,就會判斷是下單仍是撤單,而後進行相應的邏輯處理了。
咱們先來看看撤單的邏輯,這個比較簡單:
func dealCancel(order *Order, book *orderBook) {
var ok bool
switch order.Side {
case enum.SideBuy:
ok = book.removeBuyOrder(order)
case enum.SideSell:
ok = book.removeSellOrder(order)
}
cache.RemoveOrder(order.ToMap())
mq.SendCancelResult(order.Symbol, order.OrderId, ok)
log.Info("engine %s, order %s cancel result is %s", order.Symbol, order.OrderId, ok)
}
複製代碼
核心就三個步驟:
下單邏輯就比較複雜了,須要根據不一樣的訂單類型作不一樣的邏輯處理,請看代碼:
func dealCreate(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
switch order.Type {
case enum.TypeLimit:
dealLimit(order, book, lastTradePrice)
case enum.TypeLimitIoc:
dealLimitIoc(order, book, lastTradePrice)
case enum.TypeMarket:
dealMarket(order, book, lastTradePrice)
case enum.TypeMarketTop5:
dealMarketTop5(order, book, lastTradePrice)
case enum.TypeMarketTop10:
dealMarketTop10(order, book, lastTradePrice)
case enum.TypeMarketOpponent:
dealMarketOpponent(order, book, lastTradePrice)
}
}
複製代碼
每一個類型再分買賣方向處理,以 dealLimit() 爲例:
func dealLimit(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
switch order.Side {
case enum.SideBuy:
dealBuyLimit(order, book, lastTradePrice)
case enum.SideSell:
dealSellLimit(order, book, lastTradePrice)
}
}
複製代碼
而後,再來看看 dealBuyLimit() 的處理邏輯:
func dealBuyLimit(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
LOOP:
headOrder := book.getHeadSellOrder()
if headOrder == nil || order.Price.LessThan(headOrder.Price) {
book.addBuyOrder(order)
log.Info("engine %s, a order has added to the orderbook: %s", order.Symbol, order.ToJson())
} else {
matchTrade(headOrder, order, book, lastTradePrice)
if order.Amount.IsPositive() {
goto LOOP
}
}
}
複製代碼
我來解析下這個處理流程:
其中,匹配成交的記錄會做爲一條輸出記錄發送到 MQ。
對其餘類型的處理也是相似的,就再也不一一講解了。
那引擎包的實現就先講到這裏,後續文章再聊其餘部分的實現。
本小節主要仍是經過代碼梳理清楚整個數據流程,包括一些細節上的設計。理解了本文所列舉的這些代碼,也就對整個撮合服務的實現理解一大半了。
此次的思考題:ChanMap 保存的訂單通道是否能夠改用無緩衝的通道?用無緩衝的通道和用有緩衝的通道處理邏輯有哪些不一樣?兩種方案各自的優缺點是什麼?
掃描如下二維碼便可關注公衆號(公衆號名稱:Keegan小鋼)