nsq 源碼分析之tcp協議部分

    最近恰好看到其餘幾個項目有socket 編程,而後,想了下,在golang 中還沒用過socket tcp 編程,因而看了一些im 的協議解析過程,都不是太滿意,不是太通用,恰好看到nsq,發現nsq 這部分真是簡單粗暴。還用的是標準庫的一些東西,很是通用。html

    該文章後續仍在不斷的更新修改中, 請移步到原文地址http://dmwan.ccpython

    nsq 的協議文檔地址:https://nsq.io/clients/tcp_protocol_spec.html。golang

    首先,tcp 編程,最麻煩的地方是處理粘包,這個玩意怎麼處理,通常是經過分隔符和兩次讀處理。httppaser 基本會封裝這兩種,好比tornado 本身封裝的readbytes,readuntil,一個是定長讀,一個是讀到某個標識符爲止,中間會爲讀寫分配個緩衝區。這裏道理很好理解,定長讀,先讀約定的header,好比先讀4個字節,獲得後面有n個字節後,一直循環向buffer 放,而readuntil 每次read 放到buffer,不斷find 分隔符。這裏的邊界條件,無論是python 仍是c 寫起來都是比較麻煩的事情。編程

    其實golang 裏算是比較簡單的,看個例子,下面這個是典型,第一行是command,'\n'分隔符結尾,第二行,由於這個command 有參數,用4個字節約定後面body 長度。數組

Publish a message to a topic:bash

PUB <topic_name>\n
[ 4-byte size in bytes ][ N-byte binary data ]

<topic_name> - a valid string (optionally having #ephemeral suffix)

    咱們作的時候,只須要有兩個函數,一個能readuntil,一個能readbytes就ok,golang 有沒?其實golang標準庫是有的,分別是bufio.ReadSlice ,對應readuntil, 一個是io.ReadFull, 對應readbytes。app

    nsq tcp server  流程算是比較清晰的。socket

    1,tcp accept 後,由一個 handle 處理,每一個conn 一個IOLoop, 這個IOLoop 會維持這個連接,進行讀寫。tcp

 2, IoLoop 爲每一個conn 新建一個client 對象,這個對象的Reader 實例化的時候用的是*bufio.Reader,而bufio 封裝了一系列的io 方法,這裏比較核心的是ReadSlice ,能讀到後綴爲止。讀到的line 按空格解析成command 和 參數 , 由後面反射成不一樣命令,由Exec 執行。函數

    3, p.Exec 會根據不一樣的命令,執行不一樣方法。

    4,有的命令會附帶body ,這裏body 按照協議,是由4 byte 的size 加具體body 構成。這裏解析,分兩次讀取。以pub 爲例:

    readlen 其實就是ReadFull,client.lenSlice 就是一個[4]byte 的字節數組,因此,其實很明顯,兩次讀取,一個讀長度,按長度分配buffer,二次讀取body。

func readLen(r io.Reader, tmp []byte) (int32, error) {
	_, err := io.ReadFull(r, tmp)
	if err != nil {
		return 0, err
	}
	return int32(binary.BigEndian.Uint32(tmp)), nil
}

    上面過程基本就能夠 理解整個nsq tcp server 的數據流向了,這裏的標準庫函數能夠很快移植到其餘的tcp server 項目中去,只須要構建好本身的protocol,本身的命令反射就ok。

    下面分析下 ReadFull 的源碼,相比其餘語言,按定長讀取,放到buffer,這裏仍是比較有意思的。這樣,讀到指定長度err 爲nil,讀不到,數據爲bad data。同理,readslice 方式相似。

// ReadAtLeast reads from r into buf until it has read at least min bytes.
// It returns the number of bytes copied and an error if fewer bytes were read.
// The error is EOF only if no bytes were read.
// If an EOF happens after reading fewer than min bytes,
// ReadAtLeast returns ErrUnexpectedEOF.
// If min is greater than the length of buf, ReadAtLeast returns ErrShortBuffer.
// On return, n >= min if and only if err == nil.
func ReadAtLeast(r Reader, buf []byte, min int) (n int, err error) {
	if len(buf) < min {
		return 0, ErrShortBuffer
	}
	for n < min && err == nil {
		var nn int
		nn, err = r.Read(buf[n:])// 這裏不會按定長讀取1024,而是按照傳參,變長讀,循環放倒buf,不會出現讀多的問題 
		n += nn
	}
	if n >= min {
		err = nil
	} else if n > 0 && err == EOF {
		err = ErrUnexpectedEOF
	}
	return
}

// ReadFull reads exactly len(buf) bytes from r into buf.
// It returns the number of bytes copied and an error if fewer bytes were read.
// The error is EOF only if no bytes were read.
// If an EOF happens after reading some but not all the bytes,
// ReadFull returns ErrUnexpectedEOF.
// On return, n == len(buf) if and only if err == nil.
func ReadFull(r Reader, buf []byte) (n int, err error) {
	return ReadAtLeast(r, buf, len(buf))
}

     如何防止client 端惡意不傳完整參數?設置超時屬性就ok。以上。

相關文章
相關標籤/搜索