撮合引擎開發:流程的代碼實現

歡迎關注「Keegan小鋼」公衆號獲取更多文章git


撮合引擎開發:開篇github

撮合引擎開發:MVP版本web

撮合引擎開發:數據結構設計redis

撮合引擎開發:對接黑箱spring

撮合引擎開發:解密黑箱流程json

撮合引擎開發:流程的代碼實現緩存


程序入口

咱們要開始聊代碼實現邏輯了,若是不記得以前講的目錄結構,請回去翻看前文。聊代碼實現的第一步天然從程序入口開始,核心就兩個函數:init()main(),其代碼以下:bash

package main

... //other codes

func init() {
	initViper()
	initLog()

	engine.Init()
	middleware.Init()
	process.Init()
}

func main() {
	mux := http.NewServeMux()
	mux.HandleFunc("/openMatching", handler.OpenMatching)
	mux.HandleFunc("/closeMatching", handler.CloseMatching)
	mux.HandleFunc("/handleOrder", handler.HandleOrder)

	log.Printf("HTTP ListenAndServe at port %s", viper.GetString("server.port"))
	if err := http.ListenAndServe(viper.GetString("server.port"), mux); err != nil {
		panic(err)
	}
}
複製代碼

init() 函數作了一些初始化的操做,我來簡單介紹這幾個初始化函數:數據結構

  • initViper():配置文件初始化,使用了第三方配置庫 viper,這是一個被普遍使用的配置庫,其 github 地址爲 github.com/spf13/viper
  • initLog():日誌初始化,程序主要使用本身定義的日誌包用來輸出日誌文件,該日誌包的實現後續文章再單獨講。
  • engine.Init():引擎包的初始化,只是初始化了一個 map,用來保存不一樣交易標的的訂單 channel,做爲各交易標的的定序隊列來用。
  • middleware.Init():中間件的初始化,咱們用到的中間件就只有 Redis,因此這裏其實就是初始化 Redis 鏈接。Redis 客戶端庫方面我選擇的是 go-redis/redis
  • process.Init():這一步主要是從緩存加載和恢復各交易標的引擎的啓動和全部訂單數據。

viper 和 redis 的初始化都是參照官方 demo 寫的,這裏就不展開說明了。log 後續再單獨講。engine 包和 process 包的初始化就須要好好講講。app

其中,引擎包的初始化雖然很是簡單,但很關鍵,其代碼寫在 engine/init.go 文件中,完整代碼以下:

package engine

var ChanMap map[string]chan Order

func Init() {
	ChanMap = make(map[string]chan Order)
}
複製代碼

這個保存通道的 map,其 Key 是各交易標的的 symbol,便是說每一個交易標的各有一個訂單通道,這些訂單通道將做爲每一個交易標的的定序隊列。

process 包的初始化則以下:

func Init() {
	symbols := cache.GetSymbols()
	for _, symbol := range symbols {
		price := cache.GetPrice(symbol)
		NewEngine(symbol, price)

		orderIds := cache.GetOrderIdsWithAction(symbol)
		for _, orderId := range orderIds {
			mapOrder := cache.GetOrder(symbol, orderId)
			order := engine.Order{}
			order.FromMap(mapOrder)
			engine.ChanMap[order.Symbol] <- order
		}
	}
}
複製代碼

簡單講解下實現邏輯:

  1. 從緩存讀取全部 symbol,即程序重啓以前,已經開啓了撮合的全部交易標的的 symbol;
  2. 從緩存讀取每一個 symbol 對應的價格,這是程序重啓前的最新成交價格;
  3. 啓動每一個 symbol 的撮合引擎;
  4. 從緩存讀取每一個 symbol 的全部訂單,這些訂單都是按時間順序排列的;
  5. 按順序將這些訂單添加到對應 symbol 的訂單通道里去。

若是對這裏面有些設計邏輯還不太明白的話,也不要緊,後面講到對應模塊時會再詳細說明。

main() 函數裏,定義了咱們以前所說的三個接口,分別交由對應的 handler 去處理具體的請求,以後就啓動 http 服務了。

handler

由於只有幾個接口,並且也很簡單,所以,並無引入第三方 web 框架,handler 都是用原生實現的。先來看看 OpenMatching 的完整實現:

package handler

import (
	"encoding/json"
	"io/ioutil"
	"net/http"
	"strings"

	"matching/errcode"
	"matching/process"

	"github.com/shopspring/decimal"
)

type openMatchingParams struct {
	Symbol string          `json:"symbol"`
	Price  decimal.Decimal `json:"price"`
}

func OpenMatching(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")
	if r.Method != http.MethodPost {
		w.WriteHeader(http.StatusMethodNotAllowed)
		return
	}

	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	var params openMatchingParams
	if err := json.Unmarshal(body, &params); err != nil {
		w.WriteHeader(http.StatusBadRequest)
		return
	}

	if strings.TrimSpace(params.Symbol) == "" {
		w.Write(errcode.BlankSymbol.ToJson())
		return
	}

	if params.Price.IsNegative() {
		w.Write(errcode.InvalidPrice.ToJson())
		return
	}

	if e := process.NewEngine(params.Symbol, params.Price); !e.IsOK() {
		w.Write(e.ToJson())
		return
	}

	w.Write(errcode.OK.ToJson())
}
複製代碼

邏輯很是簡單,先判斷是否爲 POST 請求,再讀取 body 裏的數據並轉爲結構體對象,接着對參數作個簡單的檢查,最後就調用 process.NewEngine(symbol, price) 進入下一步的業務邏輯,若是結果返回是 OK,也返回 OK 做爲請求的響應。

另外,用到了第三方的 decimal.Decimal 類型用來表示價格,整個程序都統一用 decimal 來表示浮點數和作精確計算。

CloseMatchingHandleOrder 的實現邏輯也是同理,CloseMatching 最後會調用 process.CloseEngine(symbol) 函數進入下一步的處理,HandleOrder 最後則調用 process.Dispatch(order) 進入下一步。不過,Order 結構體是定義在 engine 包的,其結構以下:

type Order struct {
	Action    enum.OrderAction `json:"action"`
	Symbol    string           `json:"symbol"`
	OrderId   string           `json:"orderId"`
	Side      enum.OrderSide   `json:"side"`
	Type      enum.OrderType   `json:"type"`
	Amount    decimal.Decimal  `json:"amount"`
	Price     decimal.Decimal  `json:"price"`
	Timestamp int64            `json:"timestamp"`
}
複製代碼

能夠看到,其中的字段,除了有 Decimal 類型,還有 enum 包的幾個類型,這幾個實際上是咱們程序中本身定義的枚舉類型。Golang 語言自己並無提供和其餘語言同樣的 enum 關鍵字來定義枚舉類型,因此通常採用類型定義+常量來模擬枚舉類型,以 enum.OrderAction 爲例:

type OrderAction string

const (
	ActionCreate OrderAction = "create"
	ActionCancel OrderAction = "cancel"
)
複製代碼

其餘幾個枚舉類型也是這樣定義的。

另外,爲了方便轉爲字符串和檢驗參數是否有效,程序中還爲每一個枚舉類型分別提供了兩個函數,仍是以 OrderAction 爲例:

func (o OrderAction) String() string {
	switch o {
	case ActionCreate:
		return "create"
	case ActionCancel:
		return "cancel"
	default:
		return "unknown"
	}
}

func (o OrderAction) Valid() bool {
	if o.String() == "unknown" {
		return false
	}
	return true
}
複製代碼

其餘幾個枚舉類型也都定義了相似的兩個函數,就再也不貼代碼了。

process 包

來回顧下 process 包有哪些文件:

└── process                  #
    ├── close_engine.go      # 關閉引擎
    ├── dispatch.go          # 分發訂單
    ├── init.go              # 初始化
    └── new_engine.go        # 啓動新引擎
複製代碼

init.go 就一個初始化函數,上文已經講了。其餘三個文件分別定義了上文三個 handler 對應的下一步邏輯實現。

啓動新引擎

先來看看 new_engine.go

package process

import (
	"matching/engine"
	"matching/errcode"
	"matching/middleware/cache"

	"github.com/shopspring/decimal"
)

func NewEngine(symbol string, price decimal.Decimal) *errcode.Errcode {
	if engine.ChanMap[symbol] != nil {
		return errcode.EngineExist
	}

	engine.ChanMap[symbol] = make(chan engine.Order, 100)
	go engine.Run(symbol, price)

	cache.SaveSymbol(symbol)
	cache.SavePrice(symbol, price)

	return errcode.OK
}
複製代碼

邏輯也是比較簡單的,第一步先判斷 ChanMap[symbol] 是否爲空,該 ChanMap 就是上文所說的引擎包初始化時用來保存訂單通道的 map。若是 ChanMap[symbol] 不爲空,說明該 symbol 的撮合引擎已經啓動過了,那就返回錯誤。若是爲空,那就初始化這個 symbol 的通道,從代碼可知,ChanMap[symbol] 初始化爲一個緩衝大小爲 100 的訂單通道。

接着,就調用 engine.Run() 啓動一個 goroutine 了,這行代碼即表示用 goroutine 的方式啓動指定 symbol 的撮合引擎了。

而後,就將 symbol 和 price 都緩存起來了。

最後,返回 OK,搞定。

2. 分發訂單

接着,來看看 Dispatch 的實現又是怎樣的:

func Dispatch(order engine.Order) *errcode.Errcode {
	if engine.ChanMap[order.Symbol] == nil {
		return errcode.EngineNotFound
	}

	if order.Action == enum.ActionCreate {
		if cache.OrderExist(order.Symbol, order.OrderId, order.Action.String()) {
			return errcode.OrderExist
		}
	} else {
		if !cache.OrderExist(order.Symbol, order.OrderId, enum.ActionCreate.String()) {
			return errcode.OrderNotFound
		}
	}

	order.Timestamp = time.Now().UnixNano() / 1e3
	cache.SaveOrder(order.ToMap())
	engine.ChanMap[order.Symbol] <- order

	return errcode.OK
}
複製代碼

第一步,判斷 ChanMap[order.Symbol] 是否爲空,若是爲空,表示引擎沒開啓,那就沒法處理訂單。

第二步,判斷訂單是否存在。若是是 create 訂單,那緩存中就不該該查到訂單,不然說明是重複請求。若是是 cancel 訂單,那緩存中若是也查不到訂單,那說明該訂單已經所有成交或已經成功撤單過了。

第三步,將訂單時間設爲當前時間,時間單位是 100 納秒,這能夠保證時間戳長度恰好爲 16 位,保存到 Redis 裏就不會有精度失真的問題。這點後續文章講到 Redis 詳細設計時再說。

第四步,將訂單緩存。

第五步,將訂單傳入對應的訂單通道,對應引擎會從該通道中獲取該訂單進行處理。這一步就實現了訂單的分發。

第六步,返回 OK。

3. 關閉引擎

關閉引擎的實現就很是簡單了,請看代碼:

func CloseEngine(symbol string) *errcode.Errcode {
	if engine.ChanMap[symbol] == nil {
		return errcode.EngineNotFound
	}

	close(engine.ChanMap[symbol])

	return errcode.OK
}
複製代碼

核心代碼就一行,將對應 symbol 的訂單通道關閉。後續的處理實際上是在引擎裏完成的,待會咱們再結合引擎裏的代碼來說解這個設計。

引擎入口的實現

交易引擎 goroutine 的啓動入口就是 engine.Run() 函數,來看看其代碼實現:

func Run(symbol string, price decimal.Decimal) {
	lastTradePrice := price

	book := &orderBook{}
	book.init()

	log.Info("engine %s is running", symbol)
	for {
		order, ok := <-ChanMap[symbol]
		if !ok {
			log.Info("engine %s is closed", symbol)
			delete(ChanMap, symbol)
			cache.Clear(symbol)
			return
		}
		log.Info("engine %s receive an order: %s", symbol, order.ToJson())
		switch order.Action {
		case enum.ActionCreate:
			dealCreate(&order, book, &lastTradePrice)
		case enum.ActionCancel:
			dealCancel(&order, book)
		}
	}
}
複製代碼

第一步,先定義和初始化了一個 book 變量,該變量就是用來保存整個交易委託帳本

接着,就是一個 for 循環了,for 循環裏的第一行就是從對應 symbol 的訂單通道里讀取出一個訂單,讀取到訂單時,order 變量就會有值,且 ok 變量爲 true。若是通道里暫時沒有訂單,那就會阻塞在這行代碼,直到從通道中獲取到訂單或通道已關閉的消息。

當通道被關閉以後,最後,從通道中讀取到的 ok 變量則爲 false,固然,在這以前,會先依序讀取完通道里剩下的訂單。當 ok 爲 false 時,引擎裏會執行兩步操做:一是從 ChanMap 中刪除該 symbol 對應的記錄,二是清空該 symbol 對應的緩存數據。最後用 return 來退出 for 循環,這樣,整個 Run() 函數就結束退出了,意味着該引擎也真正關閉了。

當每讀取到一個訂單,就會判斷是下單仍是撤單,而後進行相應的邏輯處理了。

咱們先來看看撤單的邏輯,這個比較簡單:

func dealCancel(order *Order, book *orderBook) {
	var ok bool
	switch order.Side {
	case enum.SideBuy:
		ok = book.removeBuyOrder(order)
	case enum.SideSell:
		ok = book.removeSellOrder(order)
	}

	cache.RemoveOrder(order.ToMap())
	mq.SendCancelResult(order.Symbol, order.OrderId, ok)
	log.Info("engine %s, order %s cancel result is %s", order.Symbol, order.OrderId, ok)
}
複製代碼

核心就三個步驟:

  1. 從委託帳本中移除該訂單;
  2. 從緩存中移除該訂單;
  3. 發送撤單結果到 MQ。

下單邏輯就比較複雜了,須要根據不一樣的訂單類型作不一樣的邏輯處理,請看代碼:

func dealCreate(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
	switch order.Type {
	case enum.TypeLimit:
		dealLimit(order, book, lastTradePrice)
	case enum.TypeLimitIoc:
		dealLimitIoc(order, book, lastTradePrice)
	case enum.TypeMarket:
		dealMarket(order, book, lastTradePrice)
	case enum.TypeMarketTop5:
		dealMarketTop5(order, book, lastTradePrice)
	case enum.TypeMarketTop10:
		dealMarketTop10(order, book, lastTradePrice)
	case enum.TypeMarketOpponent:
		dealMarketOpponent(order, book, lastTradePrice)
	}
}
複製代碼

每一個類型再分買賣方向處理,以 dealLimit() 爲例:

func dealLimit(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
	switch order.Side {
	case enum.SideBuy:
		dealBuyLimit(order, book, lastTradePrice)
	case enum.SideSell:
		dealSellLimit(order, book, lastTradePrice)
	}
}
複製代碼

而後,再來看看 dealBuyLimit() 的處理邏輯:

func dealBuyLimit(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
LOOP:
	headOrder := book.getHeadSellOrder()
	if headOrder == nil || order.Price.LessThan(headOrder.Price) {
		book.addBuyOrder(order)
		log.Info("engine %s, a order has added to the orderbook: %s", order.Symbol, order.ToJson())
	} else {
		matchTrade(headOrder, order, book, lastTradePrice)
		if order.Amount.IsPositive() {
			goto LOOP
		}
	}
}
複製代碼

我來解析下這個處理流程:

  1. 從委託帳本中讀取出賣單隊列的頭部訂單;
  2. 若是頭部訂單爲空,或新訂單(買單)價格小於頭部訂單(賣單),則沒法匹配成交,那就將新訂單添加到委託帳本的買單隊列中去;
  3. 若是頭部訂單不爲空,且新訂單(買單)價格大於等於頭部訂單(賣單),則兩個訂單能夠匹配成交,那就對這兩個訂單進行成交處理;
  4. 若是上一步的成交處理完以後,新訂單的剩餘數量還不爲零,那就繼續重複第一步。

其中,匹配成交的記錄會做爲一條輸出記錄發送到 MQ。

對其餘類型的處理也是相似的,就再也不一一講解了。

那引擎包的實現就先講到這裏,後續文章再聊其餘部分的實現。

小結

本小節主要仍是經過代碼梳理清楚整個數據流程,包括一些細節上的設計。理解了本文所列舉的這些代碼,也就對整個撮合服務的實現理解一大半了。

此次的思考題:ChanMap 保存的訂單通道是否能夠改用無緩衝的通道?用無緩衝的通道和用有緩衝的通道處理邏輯有哪些不一樣?兩種方案各自的優缺點是什麼?


掃描如下二維碼便可關注公衆號(公衆號名稱:Keegan小鋼)

做者的我的博客

相關文章
相關標籤/搜索