Go語言異步服務器框架原理和實現

    Go語言類庫中,有兩個官方的服務器框架,一個HTTP,一個是RPC。使用這個兩個框架,已經能解決大部分的問題,可是,也有一些需求,這些框架是不夠的,這篇文章,咱們先分析一下HTTP 和 RPC服務器的特色, 而後結合這兩個服務器的特色,我實現了一個新的服務器,這個服務器很是適合客戶端和服務器端有大量交互的狀況。數據庫

HTTP服務器的特色:json

    HTTP的請求 和 響應的週期以下:緩存

    image

對於一個HTTP 長鏈接,一個請求必須等到一個響應完成後,才能進行下一個請求。這就是http協議最本質的特色,是串行化的。而這個特色保證了http協議的簡潔性,一個請求中間不會插入其餘的請求干擾,這樣不須要去對應請求和響應。可是,同時也有個弱點,那就是不適合作大量的請求。舉個實際中咱們遇到的例子,咱們要把大量的中國客戶的訂單送入英國的交易所,交易所的接口是http協議的,從中國到英國,一次http的請求到響應至少須要 300ms左右,這樣一秒一個連只能發送3個,就算是開十個線程發送(接口對線程總數是有限制的),1s 也只能是30個。而最高峯的時候,咱們可能1s 要發送1萬個訂單,那採用http協議就不能知足咱們的要求了(這個能夠經過fix協議解決)。安全

固然,http能夠解決批量提交的需求,只要增長一個批量提交的接口就能夠了。可是,這樣的實現方式不夠天然,並且增長了額外的接口。服務器

 

RPC服務的特色:網絡

PRC服務器克服了http服務器串流模型,能夠併發的提交請求。請求響應的週期圖以下:session

image

RPC服務,已經能夠客服http服務器的串流的劣勢,能夠批量提交大量的數據。在局域網的中測試,1s鍾能夠實現3萬次左右的請求。而相同的條件下,http在局域網中,只能實現1500次左右的請求,真實環境下面,延時嚴重,http性能會急劇降低。在兩個不一樣的機房中,有百兆帶寬相連,實際測試rpc請求是兩萬次左右,http是 500次左右,並且http佔用不少頭部的帶寬。數據結構

RPC的一個核心特色是相似一次函數調用。這樣一個請求 只能 對應於 一個響應。在某些情下,這彷佛是不夠的。舉個實際的例子,我要獲取一個報價的行情數據,這個時候,相似一個MessageQueue,服務器會不斷的push數據給客戶端。也就是一次請求,會有屢次返回,持續不斷的返回。架構

固然,RPC的一個很是重要的優點是,你不須要知道怎麼去解析數據,你能夠當作網絡是空氣,徹底像寫本地調用函數同樣去調用rpc的函數。併發

異步服務器:

由於暫時我沒有很好的名字來命名這個服務器,因此暫時就叫作異步服務器吧,這個服務器的特色相似一個界面程序的消息體系。咱們不斷的吧鼠標鍵盤等各類事件提交給界面程序,界面程序根據消息的類型,參數作出相應的處理。因此,咱們就叫作異步服務器吧。經典的金融服務器都是異步服務器,處理機制都相似界面的消息循環機制,好比國內期貨最經常使用的ctp交易系統,還有就是銀行間,交易所和銀行之間,常常用的一個協議叫作 fix,也是這樣的架構。請求是一種消息,響應也是一種消息。請求響應的時序圖以下:

image

msg1 請求以後,有兩個響應,Resp1 , resp2,

msg2 有一個響應 resp3.

 

借鑑了rpc的特色,請求和響應都自動編碼,寫服務器再也不爲編碼而煩惱,同時也不須要爲是否要壓縮而頭痛。如今提供三種方式,gob , json, protocolbuffer. 而且能夠 設置是否啓用壓縮的,以及壓縮的格式。我

們把客戶端和服務器的交互抽象爲一個消息系統,先來看看客戶端客戶端調用

   1: client, err := NewClient("http://localhost:8080", jar, "gob", "gzip")
   2: if err != nil {
   3:     log.Println(err)
   4:     return
   5: }
   6: defer client.Close()
   7: req := NewRequest("hello", "jack", func(call *Call, status int) {
   8:     log.Println(call, call.Resp, status)
   9: })
  10: client.Go(req)
  11: req2 := NewRequest("hello", "fuck", func(call *Call, status int) {
  12:     log.Println(call, call.Resp, status)
  13: })
  14: client.Go(req2)
  15: //wait for all req is done
  16: client.Wait()

1-6行,咱們創建了一個到服務器的鏈接,注意,咱們這個服務器底層是用http包實現的。jar 是用來管理session的,這裏暫時忽略,gob是編碼,gzip是壓縮格式。能夠動態設置各類編碼和壓縮格式。

7-13行,NewRequest 的第一個參數是消息的類型(我建議再後面的版本中,改爲NewMessage, Client.GO 改爲 client.Send),叫作hello, 詳細類型爲了方便查看也打印,我採用字符串的格式。後面是消息的參數,能夠是任何的go的結構,變量。每一個請求對應一個回調函數,處理響應的消息,響應的消息保存在 call.Resp 裏面,若是status == StatusDone , 表示請求結束了,服務器不會響應任何消息了,status  == StatusUpdate ,說明,還會有下一個消息過來。

16行 Wait函數,其實就是一個消息循環函數,不斷的從服務器端讀取消息,對應到某個請求的回調函數裏面。相似event loop

咱們在Client裏面加入心跳函數,保證能檢查到連接損壞的狀況,若是鏈接損壞,會自動結束消息循環,錯誤處理是一個服務器很是重要的一環。

 

而後咱們再來看看服務器端的實現:

   1: func helloWorld(w *ResponseWriter, r *Request) {
   2:     resp := w.Resp
   3:     resp.MsgType = MsgTString
   4:     //表示我已經沒有其餘數據包了,這個請求已經結束了
   5:     resp.Done = true
   6:     //向客戶端發送請求
   7:     w.WriteResponse(resp, "hello: " + r.GetBody().(string))
   8: }

第7行中,r.GetBody() 獲取的到是上面NewRequest 中的第二個參數。

這樣就是一個最簡單的hello world 程序。要實現一個實戰有用的服務器,的細節固然還有不少,主要的是流量控制。好比,一個用戶寫錯程序了,錯誤的發起了10萬個請求,服務器端不能開個10萬個go進行處理,這樣的話,會直接拖垮服務器,咱們給每一個用戶設置了一個併發處理數目,最多這個用戶能夠併發處理多少個請求。還有一個比較重要的,對服務器來講,就是服務器服務的量的限制。咱們會實時監控 cpu 內存,io的使用狀況,當發現使用到某個限額的時候,服務會拒絕接受鏈接(事先要對性能進行測試)這些都是爲了防止服務器過載 ,而實際中的服務器,這個問題實際上是很常見的。

 

實例:可靠消息通知系統。

    可靠消息通知系統其實是一個很是常見的系統。最經常使用的一個例子就是數據庫的master slave 模式。master裏面的事件要很是可靠的通知到slave,中間不能有任何的丟失。還有一種好比交易系統中,咱們會調用銀行或者交易所的接口,銀行在交易成功後會給咱們一個通知,這個通知的消息必須可靠的被通知到目標,不能有任何的丟失。在咱們的系統中,行情數據的複製也是不能有任何數據丟失的情景,爲了保證A 服務器 和 B服務器有相同的行情,在從A服務器的消息要被B服務器準確的接收。固然,你也能夠作一個聊天系統,這個聊天系統不會丟失任何消息。

    那麼如何實現這個系統呢,首先,爲了保證不在內存中丟失消息,那麼消息必須寫盤,而且爲了檢測消息是否丟失,必須給消息編號。消息寫盤也能夠用咱們開發的事務日誌系統,若是消息很是的大量,那麼還須要批量提交模式(Group Commit)。大部分狀況下,消息丟失不是由於服務器崩潰,並且網絡意外中斷,這些中斷每每時間很短,在1分鐘之內,因此,有必要在內存中緩存部分的消息,若是網絡中斷,客戶端再次請求時,發送當時的消息序號,這樣就能夠補全網絡中斷丟失的數據。若是時間太長了,內存中的數據不夠補了,那麼首先要從消息源數據庫中下載歷史消息,而後再接受實時的消息。總體的思路就是這樣的,在這裏,咱們就看看咱們的消息通知系統的實時廣播部分的設計。

    1. 消息廣播基本流程: 訂閱 –> 廣播:

首先客戶端向服務器說明,我要訂閱哪些消息,好比,master slave 中,我只要寫消息就行了,讀消息就不須要了。而後,再向服務器請求數據,服務器廣播數據給咱們。注意,咱們這裏把訂閱 和 廣播分紅兩個部分,兩個請求,那麼怎麼知道這兩個請求是同一我的發出的呢?或者,怎麼關聯起來呢?這裏,我用了一個session的概念,訂閱的時候,把訂閱的消息類型保存到session,廣播的時候,從session中讀取消息類型,而後發送對應的數據。

這部分的代碼以下:

   1: var bmu sync.Mutex
   2: var defaultBroadcast = make(map[int64]*Broadcast)
   3: var ErrNotRingItemer = errors.New("ErrNotRingItemer")
   4: //基本上能夠保證有1個小時的數據
   5: const btickSize = 3600 * 4
   6: //能夠傳遞任意的數據
   7:  
   8: func GetBroadcast(name int64, n int) (*Broadcast, error) {
   9:     bmu.Lock()
  10:     defer bmu.Unlock()
  11:     b, ok := defaultBroadcast[name]
  12:     if ok {
  13:         return b, nil
  14:     }
  15:     b , err := NewBroadcast(name, n)
  16:     if err != nil {
  17:         return nil, err
  18:     }
  19:     defaultBroadcast[name] = b
  20:     return b, nil
  21: }
  22:  
  23: type Broadcast struct {
  24:     mu sync.RWMutex
  25:     targets map[int64]*Subscribe
  26:     ringbuffer *algo.RingBuffer
  27:     name int64
  28: }
  29:  
  30: func NewBroadcast(name int64, n int) (*Broadcast, error) {
  31:     b := &Broadcast{}
  32:     b.targets = make(map[int64]*Subscribe)
  33:     b.ringbuffer = algo.NewRingBuffer(n, nil)
  34:     b.name = name
  35:     return b, nil
  36: }
  37:  
  38: func (b *Broadcast) GetName() int64 {
  39:     return b.name
  40: }
  41:  
  42: func (b *Broadcast) Sub(id int64, req *Subscribe) {
  43:     b.mu.Lock()
  44:     defer b.mu.Unlock()
  45:     b.targets[id] = req
  46: }
  47:  
  48: func (b *Broadcast) Unsub(id int64) {
  49:     b.mu.Lock()
  50:     defer b.mu.Unlock()
  51:     delete(b.targets, id)
  52: }
  53:  
  54: //是否在buffer內部
  55: func (b *Broadcast) InBuffer(start int64, end int64) (bool, error) {
  56:     return b.ringbuffer.InBuffer(start, end)
  57: }
  58:  
  59: func (b *Broadcast) Query(start int64, end int64, ty int64) (algo.Iterator, error) {
  60:     find := &algo.RingFind{start, end, ty}
  61:     return b.ringbuffer.Find(find, true) //模糊查找,不是精確匹配
  62: }
  63:  
  64: //若是要提供查詢功能,那麼就要緩存數據,通常採用ringbuffer
  65: //data要知足下面的條件:
  66: //1. 存在一個遞增着的ID
  67: //2. 實現BufferItemer接口
  68: func (b *Broadcast) Push(item algo.RingItemer) error {
  69:     b.mu.RLock()
  70:     defer b.mu.RUnlock()
  71:     item2, err := b.ringbuffer.Push(item)
  72:     if err != nil {
  73:         return err
  74:     }
  75:     for _, v := range b.targets {
  76:         //過濾不想發送的
  77:         if (v.Check(b.name, item2.Type)) {
  78:             v.Send(item)
  79:         }
  80:     }
  81:     return nil
  82: }
  83:  
  84: func (b *Broadcast) Find(find *algo.RingFind) (algo.Iterator, error) {
  85:     return b.ringbuffer.Find(find, true)
  86: }
  87:  
  88: type Subscribe struct {
  89:     mu sync.Mutex
  90:     ch chan interface{}
  91:     tys map[int64]int64
  92: }
  93:  
  94: func NewSubscribe(n int) (*Subscribe) {
  95:     s := &Subscribe{}
  96:     s.ch = make(chan interface{}, n)
  97:     s.tys = make(map[int64]int64)
  98:     return s
  99: }
 100:  
 101: func (s *Subscribe) Add(bname int64, ty int64) {
 102:     s.mu.Lock()
 103:     defer s.mu.Unlock()
 104:     s.tys[bname] = ty
 105: }
 106:  
 107: func (s *Subscribe) Check(bname int64, dataty int64) bool {
 108:     s.mu.Lock()
 109:     defer s.mu.Unlock()
 110:     ty, ok := s.tys[bname]
 111:     if !ok { //沒有訂閱
 112:         return false
 113:     }
 114:     if ty == algo.AnyType || dataty == ty {
 115:         return true
 116:     }
 117:     return false
 118: }
 119:  
 120: func (s *Subscribe) Read(buf []interface{}) (int) {
 121:     var i = 1
 122:     buf[0] = <-s.ch
 123:     for {
 124:         if i == len(buf) {
 125:             return i
 126:         }
 127:         select {
 128:         case data := <-s.ch:
 129:             buf[i] = data
 130:             i++
 131:         default:
 132:             return i
 133:         }
 134:     }
 135:     panic("nerver reach")
 136: }
 137:  
 138: func (s *Subscribe) Send(data interface{}) {
 139:      select {
 140:      case s.ch <- data :
 141:      default:
 142:          //清除舊的數據
 143:          s.Clear()
 144:          //發送結束標誌位
 145:          s.ch <- nil
 146:      }
 147: }
 148:  
 149: func (s *Subscribe) Clear() {
 150:     for {
 151:         select {
 152:         case <-s.ch:
 153:         default:
 154:             return
 155:         }
 156:     }
 157: }
 158:  

這裏,有個數據結構叫作RingBuffer, 是一個環狀的buffer,很是適合作緩存固定數目的數據,用於廣播。廣播是用管道來傳輸數據的,管道的性能實際上已經很是的高,不須要什麼無鎖隊列之類的。在這裏也給管道加上buffer使得,消息意外的擾動,不會使得帶寬不夠用而立馬堵塞。

 

2. 接受消息:

在用戶登陸後,若是有權限,那麼就能夠做爲消息源客戶端,消息源的代碼以下:

   1: func pushTick(w *asyn.ResponseWriter, r *asyn.Request) {
   2:     event := r.GetBody().(*response.OrderBookEvent)
   3:     b, _ := GetBroadcast(event.InstrumentId, btickSize)
   4:     b.Push(event)
   5:     asyn.Log().Println(event)
   6:     asyn.OKHandle(w, r)
   7: }

第2行: 從請求中獲取 消息事件。

第3行: event.InstrumentId 是消息的類型,btickSzie 是緩存的數據數目。

第6行: 向客戶端發送OK,確認消息發送成功。

每一個消息是否發送成功,都有確認。這樣,客戶端就知道上次消息發送到哪裏了。

 

3. 訂閱:

   1: func subscribe(w *asyn.ResponseWriter, r *asyn.Request) {
   2:     instId := r.GetBody().(int64)
   3:     log.Println("sub", instId)
   4:     b, err := GetBroadcast(instId, btickSize)
   5:     if err != nil {
   6:         r.SetErr(err)
   7:         asyn.ErrorHandle(w, r)
   8:         return
   9:     }
  10:     //訂閱的size
  11:     //get and set 要成爲一個原子操做
  12:     session := r.GetSession()
  13:     session.Get3("subscribe", func (data interface{}) interface{} {
  14:         if data == nil {
  15:             data = NewSubscribe(4096)
  16:         }
  17:         sub := data.(*Subscribe)
  18:         //廣播, 類型
  19:         id := int64(uintptr(unsafe.Pointer(session)))
  20:         sub.Add(instId, algo.AnyType)
  21:         b.Sub(id, sub)
  22:         session.OnDelete(func () {
  23:             b.Unsub(id)
  24:         })
  25:         return sub
  26:     })
  27:     asyn.OKHandle(w, r)
  28: }
 
 

第2行:獲取消息的類型,經過這個類型,能夠找到對應的廣播對象。

第12-30行:這是一個線程安全的session操做,具體看一下session.Get3 的實現就知道了:

   1: func (s *Session) Get3(name string, callback func (interface{}) interface{}) interface{} {
   2:     s.mu.Lock()
   3:     defer s.mu.Unlock()
   4:     data, err := s.get(name)
   5:     if err != nil {
   6:         data = nil
   7:     }
   8:     data = callback(data)
   9:     s.set(name, data)
  10:     return data
  11: }

s.get 獲取session的數據,若是沒有session數據,那麼爲nil。簡單的說,這裏的意思是:若是session 「subscribe」 若是尚未設置,那麼就新建一個對象,若是已經設置了,那麼讀取這個對象,而且,這個操做是線程安全的。

這裏還添加了一個session撤銷時候的操做。

4. 廣播:

   1: //讀取廣播數據
   2: func read(w *asyn.ResponseWriter, r *asyn.Request) {
   3:     session := r.GetSession()
   4:     //從session 中獲取subscribe 對象
   5:     sub := session.Get3("subscribe", func (data interface{}) interface{} {
   6:         if data == nil {
   7:             data = NewSubscribe(4096)
   8:         }
   9:         return data
  10:     }).(*Subscribe)
  11:     depth := r.GetBody().(int)
  12:     log.Println("get subscribe")
  13:     resp := w.Resp
  14:     if depth == 0 {
  15:         resp.MsgType = "ticks"
  16:     } else {
  17:         resp.MsgType = "ticks1"
  18:     }
  19:     buf := make([]interface{}, 1024)
  20:     dg := make([]*response.OrderBookEvent, 1024)
  21:     tick1 := make([]*base.TickGo, 1024)
  22:     for {
  23:         n := sub.Read(buf)
  24:         for i := 0; i < n; i++ {
  25:             if buf[i] == nil {
  26:                 //close by broadcast
  27:                 r.SetErr(errors.New("501"))
  28:                 asyn.ErrorHandle(w, r)
  29:                 return
  30:             }
  31:             if depth == 0 {
  32:                 dg[i] = buf[i].(*response.OrderBookEvent)
  33:             } else {
  34:                 tick1[i] = buf[i].(*response.OrderBookEvent).ToTickGo()
  35:             }
  36:         }
  37:         var err error
  38:         if depth == 0 {
  39:             err = w.WriteResponse(resp, dg[:n])
  40:         } else {
  41:             err = w.WriteResponse(resp, tick1[:n])
  42:         }
  43:         if err != nil {
  44:             r.SetErr(err)
  45:             asyn.ErrorHandle(w, r)
  46:             return
  47:         }
  48:     }
  49: }

 

read 有個depth參數,這是行情的深度。股票期貨裏面都有後這個概念。傳說中的幾檔行情。

第26行:這裏有個close。通常來講,是由於網絡擁堵 或者 異常,沒法發送數據了。

還有一點要注意,這裏的行情是批量發送的。sub.Read 儘量多的讀取數據,減小網絡io的次數。

固然,服務器框架自己提供了心跳機制,對消息廣播系統,實時性是很是重要的,即時的檢查出網絡異常,才能保證明時性。

 

以上是對咱們的異步消息服務器框架的一個簡單的介紹。設計這框架,很是重要的兩個理念:

1. 模塊化的設計,一個功能,就對應一個函數。

2. 模塊之間的通信採用session,而對於比較複雜的通信,能夠本身創建一個線程安全的數據結構,好比這裏的Broadcast 和 Subscribe

相關文章
相關標籤/搜索