乍一看,經過TCP/IP層鏈接兩個進程會感受可怕, 可是在Go語言中可能比你想象的要簡單的多。git
固然不少狀況下,不是大多數狀況下,使用更高級別的網絡協議毫無疑問會更好,由於可使用華麗的API, 它們隱藏了不少技術細節。如今根據不一樣的需求,有不少選擇,好比消息隊列協議, gRPC, protobuf, FlatBuffers, RESTful網站API, websocket等等。github
然而在一些特殊的場景下,特別是小型項目,選擇任何其餘方式都會感受太臃腫了,更不用說你須要引入額外的依賴包了。golang
幸運的是,使用標準庫的net包來建立簡單的網絡通訊不比你所見到的要困難。web
由於Go語言中有下面兩點簡化。shell
net.Conn接口實現了io.Reader, io.Writer和io.Closer接口。 所以能夠像對待其餘io流同樣對待TCP鏈接。編程
你可能會認爲:"好,我能在TCP中發送字符串或字節分片,很是不錯,可是遇到複雜的數據結構怎麼辦? 例如咱們遇到的是結構體類型的數據?"json
當說到經過網絡發送編碼的結構化數據,首先想到的就是JSON。 不過先稍等一下 - Go語言的標準庫encoding/gob包提供了一種序列化和發序列話Go數據類型的方法,它無需給結構體、Go語言不兼容的JSON添加字符串標籤, 或者等待使用json.Unmarshal來費勁的將文本解析爲二進制數據。安全
gob編碼解碼能夠直接操做io流,這一點很完美的匹配第一條簡化。bash
下面咱們就經過這兩條簡化規則一塊兒實現一個簡單的App。服務器
這個app應該作兩件事情:
第一部分,發送簡單字符串,將演示無需藉助高級協議的狀況下,經過TCP/IP網絡發送數據是多麼簡單。
第二部分,稍微深刻一點,經過網絡發送完整的結構體,這些結構體使用字符串、分片、映射、甚至包含到自身的遞歸指針。
辛虧有gob包,要作到這些不費吹灰之力。
客戶端 服務端 待發送結構體 解碼後結構體 testStruct結構體 testStruct結構體 | ^ V | gob編碼 ----------------------------> gob解碼 | ^ V | 發送 ============網絡================= 接收
發送字符串須要三個簡單的步驟:
net包提供了一對實現這個功能的方法。
這兩個方法都是在go源碼的src/net/tcpsock.go文件中定義的。
func ResolveTCPAddr(network, address string) (*TCPAddr, error) { switch network { case "tcp", "tcp4", "tcp6": case "": // a hint wildcard for Go 1.0 undocumented behavior network = "tcp" default: return nil, UnknownNetworkError(network) } addrs, err := DefaultResolver.internetAddrList(context.Background(), network, address) if err != nil { return nil, err } return addrs.forResolve(network, address).(*TCPAddr), nil }
ResolveTCPAddr()接收兩個字符串參數。
ResolveTCPAddr()接收的表明TCP地址的字符串(例如localhost:80, 127.0.0.1:80, 或[::1]:80, 都是表明本機的80端口), 返回(net.TCPAddr指針, nil)(若是字符串不能被解析成有效的TCP地址會返回(nil, error))。
func DialTCP(network string, laddr, raddr *TCPAddr) (*TCPConn, error) { switch network { case "tcp", "tcp4", "tcp6": default: return nil, &OpError{Op: "dial", Net: network, Source: laddr.opAddr(), Addr: raddr.opAddr(), Err: UnknownNetworkError(network)} } if raddr == nil { return nil, &OpError{Op: "dial", Net: network, Source: laddr.opAddr(), Addr: nil, Err: errMissingAddress} } c, err := dialTCP(context.Background(), network, laddr, raddr) if err != nil { return nil, &OpError{Op: "dial", Net: network, Source: laddr.opAddr(), Addr: raddr.opAddr(), Err: err} } return c, nil }
DialTCP()函數接收三個參數:
它會鏈接撥號兩個TCP地址,並返回這個鏈接做爲net.TCPConn對象返回(鏈接失敗返回error)。若是咱們不須要對Dial設置有過多控制,那麼咱們就可使用Dial()代替。
func Dial(network, address string) (Conn, error) { var d Dialer return d.Dial(network, address) }
Dial()函數接收一個TCP地址,返回一個通常的net.Conn。 這已經足夠咱們的測試用例了。然而若是你須要只有在TCP鏈接上的可用功能,可使用TCP變體(DialTCP, TCPConn, TCPAddr等等)。
成功撥號以後,咱們就能夠如上所述的那樣,將新的鏈接與其餘的輸入輸出流同等對待了。咱們甚至能夠將鏈接包裝進bufio.ReadWriter中,這樣可使用各類ReadWriter方法,例如ReadString(), ReadBytes, WriteString等等。
func Open(addr string) (*bufio.ReadWriter, error) { conn, err := net.Dial("tcp", addr) if err != nil { return nil, errors.Wrap(err, "Dialing "+addr+" failed") } // 將net.Conn對象包裝到bufio.ReadWriter中 return bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)), nil }
記住緩衝Writer在寫以後須要調用Flush()方法, 這樣全部的數據纔會刷到底層網絡鏈接中。
最後,每一個鏈接對象都有一個Close()方法來終止通訊。
Dialer結構體定義以下:
type Dialer struct { Timeout time.Duration Deadline time.Time LocalAddr Addr DualStack bool FallbackDelay time.Duration KeepAlive time.Duration Resolver *Resolver Cancel <-chan struct{} }
有兩個可用選項能夠微調。
所以Dialer接口提供了能夠微調的兩方面選項:
type Conn interface { Read(b []byte) (n int, err error) Write(b []byte) (n int, err error) Close() error LocalAddr() Addr RemoteAddr() Addr SetDeadline(t time.Time) error SetReadDeadline(t time.Time) error SetWriteDeadline(t time.Time) error }
net.Conn接口是面向流的通常的網絡鏈接。它具備下面這些接口方法:
Conn接口也有deadline設置; 有對整個鏈接的(SetDeadLine()),也有特定讀寫調用的(SetReadDeadLine()和SetWriteDeadLine())。
注意deadline是(wallclock)時間固定點。和timeout不一樣,它們新活動以後不會重置。所以鏈接上的每一個活動必須設置新的deadline。
下面的樣本代碼沒有使用deadline, 由於它足夠簡單,咱們能夠很容易看到何時會被卡住。Ctrl-C時咱們手動觸發deadline的工具。
接收端步驟以下:
監聽須要指定本地監聽的端口號。通常來講,監聽應用程序(也叫server)宣佈監聽的端口號,若是提供標準服務, 那麼使用這個服務對應的相關端口。例如,web服務一般監聽80來伺服HTTP, 443端口伺服HTTPS請求。 SSH守護默認監聽22端口, WHOIS服務使用端口43。
type Listener interface { // Accept waits for and returns the next connection to the listener. Accept() (Conn, error) // Close closes the listener. // Any blocked Accept operations will be unblocked and return errors. Close() error // Addr returns the listener's network address. Addr() Addr }
func Listen(network, address string) (Listener, error) { addrs, err := DefaultResolver.resolveAddrList(context.Background(), "listen", network, address, nil) if err != nil { return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err} } var l Listener switch la := addrs.first(isIPv4).(type) { case *TCPAddr: l, err = ListenTCP(network, la) case *UnixAddr: l, err = ListenUnix(network, la) default: return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}} } if err != nil { return nil, err // l is non-nil interface containing nil pointer } return l, nil }
net包實現服務端的核心部分是:
net.Listen()在給定的本地網絡地址上來建立新的監聽器。若是隻傳端口號給它,例如":61000", 那麼監聽器會監聽全部可用的網絡接口。 這至關方便,由於計算機一般至少提供兩個活動接口,迴環接口和最少一個真實網卡。 這個函數成功的話返回Listener。
Listener接口有一個Accept()方法用來等待請求進來。而後它接受請求,並給調用者返回新的鏈接。Accept()通常來講都是在循環中調用,可以同時服務多個鏈接。每一個鏈接能夠由一個單獨的goroutine處理,正以下面代碼所示的。
與其讓代碼來回推送一些字節,我更想要它演示一些更有用的東西。 我想讓它能給服務器發送帶有不一樣數據載體的不一樣命令。服務器應該能標識每一個命令和解碼命令數據。
咱們代碼中客戶端會發送兩種類型的命令: "STRING"和"GOB"。它們都以換行符終止。
"STRING"命令包含一行字符串數據,能夠經過bufio中的簡單讀寫操做來處理。
"GOB"命令由結構體組成,這個結構體包含一些字段,包含一個分片和映射,甚至指向本身的指針。 正如你所見,當運行這個代碼時,gob包能經過咱們的網絡鏈接移動這些數據沒有什麼稀奇(fuss).
咱們這裏基本上都是一些即席協議(ad-hoc protocol: 特設的、特定目的的、即席的、專案的), 客戶端和服務端都遵循它,命令行後面是換行,而後是數據。對於每一個命令來講,服務端必須知道數據的確切格式,知道如何處理它。
要達到這個目的,服務端代碼採起兩步方式實現。
package main import ( "bufio" "encoding/gob" "flag" "github.com/pkg/errors" "io" "log" "net" "strconv" "strings" "sync" ) type complexData struct { N int S string M map[string]int P []byte C *complexData } const ( Port = ":61000" )
使用發射鏈接是一種快照。net.Conn知足io.Reader和io.Writer接口,所以咱們能夠將TCP鏈接和其餘任何的Reader和Writer同樣看待。
func Open(addr string) (*bufio.ReadWriter, error) { log.Println("Dial " + addr) conn, err := net.Dial("tcp", addr) if err != nil { return nil, errors.Wrap(err, "Dialing " + addr + " failed") } return bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)), nil }
打開TCP地址的鏈接。它返回一個帶有超時的TCP鏈接,並將其包裝進緩衝的ReadWriter。撥號遠程進程。注意本地端口是實時(on the fly)分配的。若是必須指定本地端口號,請使用DialTCP()方法。
這節有點涉及到對進入數據的準備環節處理。根據咱們前面介紹的ad-hoc協議,命令名+換行符+數據+換行符。天然數據是和具體命令相關的。要處理這樣的狀況,咱們建立了一個Endpoint對象,它具備下面的屬性:
首先咱們聲明一個HandleFunc類型,該類型爲接收一個bufio.ReadWriter指針值的函數類型, 也就是後面咱們要爲每種不一樣命令註冊的處理器函數。它接收的參數是使用ReadWriter接口包裝的net.Conn鏈接。
type HandleFunc func(*bufio.ReadWriter)
而後咱們聲明一個Endpoint結構體類型,它有三個屬性:
type Endpoint struct { listener net.Listener handler map[string]HandleFunc m sync.RWMutex // Maps不是線程安全的,所以須要互斥鎖來控制訪問。 } func NewEndpoint() *Endpoint { return &Endpoint{ handler: map[string]HandleFunc{}, } } func (e *Endpoint) AddHandleFunc(name string, f HandleFunc) { e.m.Lock() e.handler[name] = f e.m.Unlock() } func (e *Endpoint) Listen() error { var err error e.listener, err = net.Listen("tcp", Port) if err != nil { return errors.Wrap(err, "Unable to listen on "+e.listener.Addr().String()+"\n") } log.Println("Listen on", e.listener.Addr().String()) for { log.Println("Accept a connection request.") conn, err := e.listener.Accept() if err != nil { log.Println("Failed accepting a connection request:", err) continue } log.Println("Handle incoming messages.") go e.handleMessages(conn) } } // handleMessages讀取鏈接到第一個換行符。 基於這個字符串,它會調用恰當的HandleFunc。 func (e *Endpoint) handleMessages(conn net.Conn) { // 將鏈接包裝到緩衝reader以便於讀取 rw := bufio.NewReadWrite(bufio.NewReader(conn), bufio.NewWriter(conn)) defer conn.Close() // 從鏈接讀取直到遇到EOF. 指望下一次輸入是命令名。調用註冊的用於該命令的處理器。 for { log.Print("Receive command '") cmd, err := rw.ReadString('\n') switch { case err == io.EOF: log.Println("Reached EOF - close this connection.\n ---") return case err != nil: log.Println("\nError reading command. Got: '" + cmd + "'\n", err) } // 修剪請求字符串中的多餘回車和空格- ReadString不會去掉任何換行。 cmd = strings.Trim(cmd, "\n ") log.Println(cmd + "'") // 從handler映射中獲取恰當的處理器函數, 並調用它。 e.m.Lock() handleCommand, ok := e.handler[cmd] e.m.Unlock() if !ok { log.Println("Command '" + cmd + "' is not registered.") return } handleCommand(rw) } }
NewEndpoint()函數是Endpoint的工廠函數。它只對handler映射進行了初始化。爲了簡化問題,假設咱們的終端監聽的端口好是固定的。
Endpoint類型聲明瞭幾個方法:
注意上面如何使用動態函數的。 根據命令名查找具體函數,而後這個具體函數賦值給handleCommand, 其實這個變量類型爲HandleFunc類型, 即前面聲明的處理器函數類型。
Endpoint的Listen方法調用以前須要先至少註冊一個處理器函數。所以咱們下面定義兩個類型的處理器函數: handleStrings和handleGob。
handleStrings()函數接收和處理咱們即時協議中只發送字符串數據的處理器函數。handleGob()函數是接收並處理髮送的gob數據的複雜結構體。handleGob稍微複雜一點,除了讀取數據外,咱們海須要解碼數據。
咱們能夠看到連續兩次使用rw.ReadString('n'), 讀取字符串,遇到換行中止, 將讀到的內容保存到字符串中。注意這個字符串是包含末尾換行的。
另外對於普通字符串數據來講,咱們直接用bufio包裝鏈接後的ReadString來讀取。而對於複雜的gob結構體來講,咱們使用gob來解碼數據。
func handleStrings(rw *bufio.ReadWriter) { log.Print("Receive STRING message:") s, err := rw.ReadString('\n') if err != nil { log.Println("Cannot read from connection.\n", err) } s = strings.Trim(s, "\n ") log.Println(s) -, err = rw.WriteString("Thank you.\n") if err != nil { log.Println("Cannot write to connection.\n", err) } err = rw.Flush() if err != nil { log.Println("Flush failed.", err) } } func handleGob(rw *bufio.ReadWriter) { log.Print("Receive GOB data:") var data complexData dec := gob.NewDecoder(rw) err := dec.Decode(&data) if err != nil { log.Println("Error decoding GOB data:", err) return } log.Printf("Outer complexData struct: \n%#v\n", data) log.Printf("Inner complexData struct: \n%#v\n", data.C) }
一切就緒,咱們能夠準備咱們的客戶端和服務端函數了。
// 當應用程序使用-connect=ip地址的時候被調用 func client(ip string) error { testStruct := complexData{ N: 23, S: "string data", M: map[string]int{"one": 1, "two": 2, "three": 3}, P: []byte("abc"), C: &complexData{ N: 256, S: "Recursive structs? Piece of cake!", M: Map[string]int{"01": "10": 2, "11": 3}, }, } rw, err := Open(ip + Port) if err != nil { return errors.Wrap(err, "Client: Failed to open connection to " + ip + Port) } log.Println("Send the string request.") n, err := rw.WriteString("STRING\n") if err != nil { return errors.Wrap(err, "Could not send the STRING request (" + strconv.Itoa(n) + " bytes written)") } // 發送STRING請求。發送請求名併發送數據。 log.Println("Send the string request.") n, err = rw.WriteString("Additional data.\n") if err != nil { return errors.Wrap(err, "Could not send additional STRING data (" + strconv.Itoa(n) + " bytes written)") } log.Println("Flush the buffer.") err = rw.Flush() if err != nil { return errors.Wrap(err, "Flush failed.") } // 讀取響應 log.Println("Read the reply.") response, err := rw.ReadString('\n') if err != nil { return errors.Wrap(err, "Client: Failed to read the reply: '" + response + "'") } log.Println("STRING request: got a response:", response) // 發送GOB請求。 建立一個encoder直接將它轉換爲rw.Send的請求名。發送GOB log.Println("Send a struct as GOB:") log.Printf("Outer complexData struct: \n%#v\n", testStruct) log.Printf("Inner complexData struct: \n%#v\n", testStruct.C) enc := gob.NewDecoder(rw) n, err = rw.WriteString("GOB\n") if err != nil { return errors.Wrap(err, "Could not write GOB data (" + strconv.Itoa(n) + " bytes written)") } err = enc.Encode(testStruct) if err != nil { return errors.Wrap(err, "Encode failed for struct: %#v", testStruct) } err = rw.Flush() if err != nil { return errors.Wrap(err, "Flush failed.") } return nil }
客戶端函數在執行應用程序時指定connect標誌的時候執行,這點後面的代碼能夠看到。
下面是服務端程序server。服務端監聽進來的請求並根據請求命令名將它們調度給註冊的具體相關處理器。
func server() error { endpoint := NewEndpoint() // 添加處理器函數 endpoint.AddHandleFunc("STRING", handleStrings) endpoint.AddHandleFunc("GOB", handleGOB) // 開始監聽 return endpoint.Listen() }
下面的main函數既能夠啓動客戶端也能夠啓動服務端, 依賴因而否設置connect標誌。 若是沒有這個標誌,則以服務器啓動進程, 監聽進來的請求。若是有標誌, 啓動爲客戶端,並鏈接到這個標誌指定的主機。
可使用localhost或127.0.0.1在同一機器上運行這兩個進程。
func main() { connect := flag.String("connect", "", "IP address of process to join. If empty, go into the listen mode.") flag.Parse() // 若是設置了connect標誌,進入客戶端模式 if *connect != '' { err := client(*connect) if err != nil { log.Println("Error:", errors.WithStack(err)) } log.Println("Client done.") return } // 不然進入服務端模式 err := server() if err != nil { log.Println("Error:", errors.WithStack(err)) } log.Println("Server done.") } // 設置日誌記錄的字段標誌 func init() { log.SetFlags(log.Lshortfile) }
第一步: 獲取代碼。 注意-d標誌自動安裝二進制到$GOPATH/bin目錄。
go get -d github.com/appliedgo/networking
第二步: cd到源代碼目錄。
cd $GOPATH/src/github.com/appliedgo/networking
第三步: 運行服務端。
go run networking.go
第四步: 打開另一個shell, 一樣進入到源碼目錄(第二步), 而後運行客戶端。
go run networking.go -connect localhost
若是你想稍微修改下源代碼,下面是一些建議:
2017-02-09: map不是線程安全的,所以若是在不一樣的goroutine中使用同一個map, 應該使用互斥鎖來控制map的訪問。
而上面的代碼,map在goroutine啓動以前已經添加好了, 所以你能夠安全的修改代碼,在handleMessages goroutine已經運行的時候調用AddHandleFunc()。
---- 2018-05-04 -----