用Go實現的簡易TCP通訊框架

接觸到GO以後,GO的網絡支持很是使人喜歡。GO實現了在語法層面上能夠保持同步語義,可是卻又沒有犧牲太多性能,底層同樣使用了IO路徑複用,好比在LINUX下用了EPOLL,在WINDOWS下用了IOCP。git

可是在開發服務端程序的時候,不少都是被動觸發的,都是客戶端發送來的請求須要處理。天生就是一個event-based的程序。而在GO下,由於併發是做爲語言的一部分,goroutine, channel等特性則很容易的使程序員在實現功能時從容的在同步與異步之間進行轉換。程序員

由於本身的須要,我針對event-based場景的服務端作了簡易的封裝。具體代碼見這裏.github

設計原則

由於GO的IO機制和併發原語的原生支持,再加上對網絡API的封裝,程序員能夠簡單的實現一個高效的服務端或者客戶端程序。通常的實現就是調用net.Listen(「tcp4」, address)獲得一個net.Listener,而後無限循環調用net.Listener.Accept,以後就能夠獲得一個net.Conn,能夠調用net.Conn的接口設置發送和接收緩衝區大小,能夠設置KEEPALIVE等。由於TCP的雙工特性,因此能夠針對一個net.Conn能夠專門啓動一個goroutine去無限循環接收對端發來的數據,而後解包等。網絡

個人想法是在這個簡單實現的基礎上作一層薄薄的封裝,使其儘可能的精簡,可是又不失靈活。但願可以適應不一樣的協議,對使用者形成儘可能小的約束。session

Session對象

該對象就是對net.Conn的一個簡易封裝,能夠經過swnet.Server.AcceptLoop獲得,也能夠經過swnet.NewSession建立新的對象,這種通常是客戶端情境下使用。獲得Session對象後,能夠調用Start方法開始工做。之因此還暴露出一個方法叫Start是由於在服務端下,可能會有某些需求,好比針對IP設置了ACL,那麼,把Start行爲交給使用者決定如何調用。可是這裏須要注意的是,若是使用者不想Start,使用者有責任本身Close掉,不然會形成資源泄露。併發

Start後,會啓動兩個goroutine,一個用於專門接收對端發來的數據,一個專門用來發送數據到對端。想發送數據到對端,能夠用AsyncSend方法,該方法會把要發送的數據排隊到發送通道。這裏使用通道的緣由是由於在服務端情境下,有必要對發送的數據進行排隊,防止發送很快,可是對端接收很慢,或者過多的調用AsyncSend方法,致使堆積了太多的數據,增長了內存的壓力。經過channel來控制發送速率我認爲是比較合理的。同時,還提供了方法能夠用來修改channel的長度,一是調用NewSession時傳入指定大小,二是調用Session.SetSendChannelSize設置大小,可是要注意的是,調用此方法時必須在Start以前完成,不然會產生錯誤。這樣作的緣由也是由於不必動態更改發送通道大小。框架

若是發送channel滿了,AsyncSend方法會返回ErrSendChanBlocking。增長這個錯誤類型也是由於上面的設計致使的。不返回這個錯誤,就沒有辦法讓使用者獲得處理該問題的機會。使用者若是拿到該錯誤,能夠本身試着分析問題的緣由,或者能夠嘗試循環發送,或者直接丟棄該次的發送數據。總之可以讓使用者獲得本身處理的機會。異步

若是Session對象已經Close了,那麼調用AsyncSend會返回ErrStoped錯誤。除此以外,由於AsyncSend是把數據排隊到發送channel中,那麼使用者有責任確保發送的數據在發送完成前不會修改。tcp

若是數據發送失敗,或者其餘緣由,個人實現是直接粗暴的Close掉該Session。函數

還有就是,可能有些用例情景下,會發送比較大的數據包,好比64K大小,或者32K大小的數據等,未了避免反覆申請內存,特此爲Session增長了SetSendCallback方法。能夠設置一個回調函數,用於在發送完成後能夠調用該回調,給予使用者回收數據對象的機會,好比能夠配合sync.Pool使用。雖然我本身測試時並無太大的效果。

爲了方便使用者設置一些net.Conn參數,增長了一個RawConn方法,能夠獲取到net.Conn 的實例。這裏實際上是挺糾結的。由於暴露出這個內部資源後,會給予使用者一個很是大的靈活度。它能夠直接繞過Session的發送channel,本身玩本身的。不過出於方便使用者使用的目的,我仍是這麼作了。使用者本身承擔相應的責任。其實這裏還能夠像net.HTTP那樣增長一個Hijack方法,讓使用者本身接管net.Conn,本身玩本身的。

Session中的不少SET/GET方法都是沒有加鎖的。一方面是由於不少操做在Start前一次完成,或者是GET的數據不是那麼緊密的。

有些時候,若是一個Session被關閉了,可能須要知道這個行爲。因此提供了SetCloseCallback方法,能夠設置該方法。不設置也沒有關係。調用closeCallback時會確保只調用一次。

協議序列化抽象

由於目標之一就是可以隔離具體協議格式。因此對協議作了抽象。只須要實現PacketProtocol接口便可:

// PacketReader is used to unmarshal a complete packet from buff
type PacketReader interface {
    // Read data from conn and build a complete packet.
    // How to read from conn is up to you. You can set read timeout or other option.
    // If buff's capacity is small, you can make a new buff, then return it,
    // so can reuse to reduce memory overhead.
    ReadPacket(conn net.Conn, buff []byte) (interface{}, []byte, error)
}

// PacketWriter is used to marshal packet into buff
type PacketWriter interface {
    // Build a complete packet. If buff's capacity is too small,  you can make a new one
    // and return it to reuse.
    BuildPacket(packet interface{}, buff []byte) ([]byte, error)

    // How to write data to conn is up to you. So you can set write timeout or other option.
    WritePacket(conn net.Conn, buff []byte) error
}

// PacketProtocol just a composite interface
type PacketProtocol interface {
    PacketReader
    PacketWriter
}

也就是實現PacketReader/PacketWriter兩個接口。爲了讓內存儘可能的複用,減小內存壓力,因此在ReadPacket方法和BuildPacket方法的返回值中須要返回一個切片。框架會在第一次調用時傳入一個默認大小的切片到這兩個方法中,若是容量不夠,使用者能夠本身從新創建切片,而後寫入數據後返回該切片。下一次再實用時就使用這個返回出來的切片。

其中ReadPacket方法是在一個專門用於接收數據的goroutine中調用。實現者能夠本身根據本身的策略進行讀取,由於傳入了net.Conn,因此使用者能夠本身設置I/O Timeout。實現者有責任返回一個完整的請求包。若是中間出了錯誤,有必要返回一個error。當發現有error後,會關閉該Session。這樣作的緣由是當讀取或者構建一個請求包失敗時,多是數據錯誤,多是鏈路錯誤,或者其餘緣由,總之,我的認爲這種狀況下沒有必要繼續處理,直接關閉連接。並且這裏還有一個須要注意的事項,返回出來的請求包中的數據若是有包含切片類型的數據,建議從新分配一個切片,而後從buff中拷貝進去,儘可能不要對buff切片作複用,不然可能會產生額外的BUG。

BuildPacket方法是在一個專門處理髮送的goroutine中調用。當發送goroutine收到數據包後,會調用BuildPacket,實現者就能夠按照本身的私有格式進行序列化。一樣的,buff不夠,就本身從新構造一個buff,而後填充數據,並返回這個buff。

WritePacket是給予實現者本身個性化發送的需求。可能實現者須要設置I/O Timeout.

請求包路由

基於event-based的實現,老是少不了要作的事情就是把一個請求包轉發到對應的處理函數中。可是具體怎麼轉,怎麼作是取決於具體的用例情景和實現的。因此我這裏作的很是簡單,就是定義了一個PacketHandler接口:

// PacketHandler is used to process packet that recved from remote session
type PacketHandler interface {
    // When got a valid packet from PacketReader, you can dispatch it.
    Handle(s *Session, packet interface{})
}

使用者本身實現對應的Handle方法便可。當接收數據的goroutine收到對端發來的數據並調用PacketReader.ReadPacket後,會調用Handle方法 ,傳入該Session實例與請求包。傳入Session的目的是方便使用者不用去維護一個Session的實例。由於有的程序員要實現的邏輯可能比較簡單,他僅僅用Session就知足了他的需求,他只須要實現對應的處理函數就行了。處理完成後,就調用Session.AsyncSend發送迴應包。

這裏其實能夠提供一個簡單的默認版本的實現的。可是考慮到協議的不一樣,那麼就致使調度的key的不一樣,因此仍是讓使用者本身發揮吧。

使用者其實在這裏有很大的自由度,他能夠作基於map關係的回調分發邏輯,也能夠作一個簡單的實現邏輯,而後經過type assert作相應的實現。具體也是看各自的口味而定。我是比較喜歡後者,能夠減小不少的Register,實現出Actor Model + Pattern Match味道的東西。

Server對象

這裏還要說一下對服務端的一個簡易封裝。Server的實現很是簡單,就是反覆的去Accept,而後構造一個Session,以後就是調用用戶傳入的回調函數,就完活了。使用者能夠本身傳入net.Listener,能夠傳入PacketProtocol, PacketHandler以及SendChanSize。這些參數會在構造Session時傳入進去,能夠減小重複的代碼實現。Server.AcceptLoop不會關閉構造出來的Session,使用者負責完成這件事情!

缺點

總體很是簡陋,只是搭了一個模製。在我本身未公開的代碼裏,實際上是實現了我所在公司的協議,實現了PacketProtocol。爲此還專門寫了個代碼生成器。

還有就是NewServer須要傳入一個net.Listener,比較蛋疼。後面再決定是否幹掉。NewSession須要傳入net.Conn,實際上是妥協的產物,由於net.Listener返回的就是net.Conn,這個實例須要交給Session使用,不得已而爲之,可是這裏囧的是,客戶端使用的時候,須要本身去net.Dial,獲得一個net.Conn,也許該提供一個swnet.Dial方法。

總結

我這個發佈的代碼是在原有的代碼基礎上進行了修改,從達達的https://github.com/funny/link中獲得了一些啓發,可是又有不少的不一樣。再次感謝達達的貢獻。

相關文章
相關標籤/搜索