kcp-go源碼解析

kcp-go源碼解析

對kcp-go的源碼解析,有錯誤之處,請必定告之。
sheepbao 2017.0612html

概念

ARQ:自動重傳請求(Automatic Repeat-reQuest,ARQ)是OSI模型中數據鏈路層的錯誤糾正協議之一.
RTO:Retransmission TimeOut
FEC:Forward Error Correctionnode

kcp簡介

kcp是一個基於udp實現快速、可靠、向前糾錯的的協議,能以比TCP浪費10%-20%的帶寬的代價,換取平均延遲下降30%-40%,且最大延遲下降三倍的傳輸效果。純算法實現,並不負責底層協議(如UDP)的收發。查看官方文檔kcpgit

kcp-go是用go實現了kcp協議的一個庫,其實kcp相似tcp,協議的實現也不少參考tcp協議的實現,滑動窗口,快速重傳,選擇性重傳,慢啓動等。
kcp和tcp同樣,也分客戶端和監聽端。github

+-+-+-+-+-+            +-+-+-+-+-+
    |  Client |            |  Server |
    +-+-+-+-+-+            +-+-+-+-+-+
        |------ kcp data ------>|     
        |<----- kcp data -------|

kcp協議

layer model

+----------------------+
|      Session         |
+----------------------+
|      KCP(ARQ)        |
+----------------------+
|      FEC(OPTIONAL)   |
+----------------------+
|      CRYPTO(OPTIONAL)|
+----------------------+
|      UDP(Packet)     |
+----------------------+

KCP header

KCP Header Format算法

4           1   1     2 (Byte)
+---+---+---+---+---+---+---+---+
|     conv      |cmd|frg|  wnd  |
+---+---+---+---+---+---+---+---+
|     ts        |     sn        |
+---+---+---+---+---+---+---+---+
|     una       |     len       |
+---+---+---+---+---+---+---+---+
|                               |
+             DATA              +
|                               |
+---+---+---+---+---+---+---+---+

代碼結構

src/vendor/github.com/xtaci/kcp-go/
├── LICENSE
├── README.md
├── crypt.go    加解密實現
├── crypt_test.go
├── donate.png
├── fec.go      向前糾錯實現
├── frame.png
├── kcp-go.png
├── kcp.go      kcp協議實現
├── kcp_test.go
├── sess.go     會話管理實現
├── sess_test.go
├── snmp.go     數據統計實現
├── updater.go  任務調度實現
├── xor.go      xor封裝
└── xor_test.go

着重研究兩個文件kcp.gosess.goapi

kcp淺析

kcp是基於udp實現的,全部udp的實現這裏不作介紹,kcp作的事情就是怎麼封裝udp的數據和怎麼解析udp的數據,再加各類處理機制,爲了重傳,擁塞控制,糾錯等。下面介紹kcp客戶端和服務端總體實現的流程,只是大概介紹一下函數流,不作詳細解析,詳細解析看後面數據流的解析。session

kcp client總體函數流

和tcp同樣,kcp要鏈接服務端須要先撥號,可是和tcp有個很大的不一樣是,即便服務端沒有啓動,客戶端同樣能夠撥號成功,由於實際上這裏的撥號沒有發送任何信息,而tcp在這裏須要三次握手。app

DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int)
	V
net.DialUDP("udp", nil, udpaddr)
	V
NewConn()
	V
newUDPSession() {初始化UDPSession}
	V
NewKCP() {初始化kcp}
	V
updater.addSession(sess) {管理session會話,任務管理,根據用戶設置的internal參數間隔來輪流喚醒任務}
	V
go sess.readLoop()
	V
go s.receiver(chPacket)
	V
s.kcpInput(data)
	V
s.fecDecoder.decodeBytes(data)
	V
s.kcp.Input(data, true, s.ackNoDelay)
	V
kcp.parse_data(seg) {將分段好的數據插入kcp.rcv_buf緩衝}
	V
notifyReadEvent()

客戶端大致的流程如上面所示,先Dial,創建udp鏈接,將這個鏈接封裝成一個會話,而後啓動一個go程,接收udp的消息。tcp

kcp server總體函數流

ListenWithOptions() 
    V
net.ListenUDP()
    V
ServerConn()
    V
newFECDecoder()
    V
go l.monitor() {從chPacket接收udp數據,寫入kcp}
    V
go l.receiver(chPacket) {從upd接收數據,併入隊列}
    V
newUDPSession()
    V
updater.addSession(sess) {管理session會話,任務管理,根據用戶設置的internal參數間隔來輪流喚醒任務}
    V
s.kcpInput(data)`
    V
s.fecDecoder.decodeBytes(data)
    V
s.kcp.Input(data, true, s.ackNoDelay)
    V
kcp.parse_data(seg) {將分段好的數據插入kcp.rcv_buf緩衝}
    V
notifyReadEvent()

服務端的大致流程如上圖所示,先Listen,啓動udp監聽,接着用一個go程監控udp的數據包,負責將不一樣session的數據寫入不一樣的udp鏈接,而後解析封裝將數據交給上層。函數

kcp 數據流詳細解析

無論是kcp的客戶端仍是服務端,他們都有io行爲,就是讀與寫,咱們只分析一個就行了,由於它們讀寫的實現是同樣的,這裏分析客戶端的讀與寫。

kcp client 發送消息

s.Write(b []byte) 
	V
s.kcp.WaitSnd() {}
	V
s.kcp.Send(b) {將數據根據mss分段,並存在kcp.snd_queue}
 	V
s.kcp.flush(false) [flush data to output] {
	if writeDelay==true {
		flush
	}else{
		每隔`interval`時間flush一次
	}
}
 	V
kcp.output(buffer, size) 
 	V
s.output(buf)
 	V
s.conn.WriteTo(ext, s.remote)
 	V
s.conn..Conn.WriteTo(buf)

讀寫都是在sess.go文件中實現的,Write方法:

// Write implements net.Conn
func (s *UDPSession) Write(b []byte) (n int, err error) {
	for {
	    ...

		// api flow control
		if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
			n = len(b)
			for {
				if len(b) <= int(s.kcp.mss) {
					s.kcp.Send(b)
					break
				} else {
					s.kcp.Send(b[:s.kcp.mss])
					b = b[s.kcp.mss:]
				}
			}

			if !s.writeDelay {
				s.kcp.flush(false)
			}
			s.mu.Unlock()
			atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
			return n, nil
		}

        ...
		// wait for write event or timeout
		select {
		case <-s.chWriteEvent:
		case <-c:
		case <-s.die:
		}

		if timeout != nil {
			timeout.Stop()
		}
	}
}

假設發送一個hello消息,Write方法會先判斷髮送窗口是否已滿,滿的話該函數阻塞,不滿則kcp.Send("hello"),而Send函數實現根據mss的值對數據分段,固然這裏的發送的hello,長度過短,只分了一個段,並把它們插入發送的隊列裏。

func (kcp *KCP) Send(buffer []byte) int {
    ...
	for i := 0; i < count; i++ {
		var size int
		if len(buffer) > int(kcp.mss) {
			size = int(kcp.mss)
		} else {
			size = len(buffer)
		}
		seg := kcp.newSegment(size)
		copy(seg.data, buffer[:size])
		if kcp.stream == 0 { // message mode
			seg.frg = uint8(count - i - 1)
		} else { // stream mode
			seg.frg = 0
		}
		kcp.snd_queue = append(kcp.snd_queue, seg)
		buffer = buffer[size:]
	}
	return 0
}

接着判斷參數writeDelay,若是參數設置爲false,則立馬發送消息,不然須要任務調度後纔會觸發發送,發送消息是由flush函數實現的。

// flush pending data
func (kcp *KCP) flush(ackOnly bool) {
	var seg Segment
	seg.conv = kcp.conv
	seg.cmd = IKCP_CMD_ACK
	seg.wnd = kcp.wnd_unused()
	seg.una = kcp.rcv_nxt

	buffer := kcp.buffer
	// flush acknowledges
	ptr := buffer
	for i, ack := range kcp.acklist {
		size := len(buffer) - len(ptr)
		if size+IKCP_OVERHEAD > int(kcp.mtu) {
			kcp.output(buffer, size)
			ptr = buffer
		}
		// filter jitters caused by bufferbloat
		if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
			seg.sn, seg.ts = ack.sn, ack.ts
			ptr = seg.encode(ptr)

		}
	}
	kcp.acklist = kcp.acklist[0:0]

	if ackOnly { // flash remain ack segments
		size := len(buffer) - len(ptr)
		if size > 0 {
			kcp.output(buffer, size)
		}
		return
	}

	// probe window size (if remote window size equals zero)
	if kcp.rmt_wnd == 0 {
		current := currentMs()
		if kcp.probe_wait == 0 {
			kcp.probe_wait = IKCP_PROBE_INIT
			kcp.ts_probe = current + kcp.probe_wait
		} else {
			if _itimediff(current, kcp.ts_probe) >= 0 {
				if kcp.probe_wait < IKCP_PROBE_INIT {
					kcp.probe_wait = IKCP_PROBE_INIT
				}
				kcp.probe_wait += kcp.probe_wait / 2
				if kcp.probe_wait > IKCP_PROBE_LIMIT {
					kcp.probe_wait = IKCP_PROBE_LIMIT
				}
				kcp.ts_probe = current + kcp.probe_wait
				kcp.probe |= IKCP_ASK_SEND
			}
		}
	} else {
		kcp.ts_probe = 0
		kcp.probe_wait = 0
	}

	// flush window probing commands
	if (kcp.probe & IKCP_ASK_SEND) != 0 {
		seg.cmd = IKCP_CMD_WASK
		size := len(buffer) - len(ptr)
		if size+IKCP_OVERHEAD > int(kcp.mtu) {
			kcp.output(buffer, size)
			ptr = buffer
		}
		ptr = seg.encode(ptr)
	}

	// flush window probing commands
	if (kcp.probe & IKCP_ASK_TELL) != 0 {
		seg.cmd = IKCP_CMD_WINS
		size := len(buffer) - len(ptr)
		if size+IKCP_OVERHEAD > int(kcp.mtu) {
			kcp.output(buffer, size)
			ptr = buffer
		}
		ptr = seg.encode(ptr)
	}

	kcp.probe = 0

	// calculate window size
	cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
	if kcp.nocwnd == 0 {
		cwnd = _imin_(kcp.cwnd, cwnd)
	}

	// sliding window, controlled by snd_nxt && sna_una+cwnd
	newSegsCount := 0
	for k := range kcp.snd_queue {
		if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
			break
		}
		newseg := kcp.snd_queue[k]
		newseg.conv = kcp.conv
		newseg.cmd = IKCP_CMD_PUSH
		newseg.sn = kcp.snd_nxt
		kcp.snd_buf = append(kcp.snd_buf, newseg)
		kcp.snd_nxt++
		newSegsCount++
		kcp.snd_queue[k].data = nil
	}
	if newSegsCount > 0 {
		kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount)
	}

	// calculate resent
	resent := uint32(kcp.fastresend)
	if kcp.fastresend <= 0 {
		resent = 0xffffffff
	}

	// check for retransmissions
	current := currentMs()
	var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
	for k := range kcp.snd_buf {
		segment := &kcp.snd_buf[k]
		needsend := false
		if segment.xmit == 0 { // initial transmit
			needsend = true
			segment.rto = kcp.rx_rto
			segment.resendts = current + segment.rto
		} else if _itimediff(current, segment.resendts) >= 0 { // RTO
			needsend = true
			if kcp.nodelay == 0 {
				segment.rto += kcp.rx_rto
			} else {
				segment.rto += kcp.rx_rto / 2
			}
			segment.resendts = current + segment.rto
			lost++
			lostSegs++
		} else if segment.fastack >= resent { // fast retransmit
			needsend = true
			segment.fastack = 0
			segment.rto = kcp.rx_rto
			segment.resendts = current + segment.rto
			change++
			fastRetransSegs++
		} else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
			needsend = true
			segment.fastack = 0
			segment.rto = kcp.rx_rto
			segment.resendts = current + segment.rto
			change++
			earlyRetransSegs++
		}

		if needsend {
			segment.xmit++
			segment.ts = current
			segment.wnd = seg.wnd
			segment.una = seg.una

			size := len(buffer) - len(ptr)
			need := IKCP_OVERHEAD + len(segment.data)

			if size+need > int(kcp.mtu) {
				kcp.output(buffer, size)
				current = currentMs() // time update for a blocking call
				ptr = buffer
			}

			ptr = segment.encode(ptr)
			copy(ptr, segment.data)
			ptr = ptr[len(segment.data):]

			if segment.xmit >= kcp.dead_link {
				kcp.state = 0xFFFFFFFF
			}
		}
	}

	// flash remain segments
	size := len(buffer) - len(ptr)
	if size > 0 {
		kcp.output(buffer, size)
	}

	// counter updates
	sum := lostSegs
	if lostSegs > 0 {
		atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
	}
	if fastRetransSegs > 0 {
		atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
		sum += fastRetransSegs
	}
	if earlyRetransSegs > 0 {
		atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
		sum += earlyRetransSegs
	}
	if sum > 0 {
		atomic.AddUint64(&DefaultSnmp.RetransSegs, sum)
	}

	// update ssthresh
	// rate halving, https://tools.ietf.org/html/rfc6937
	if change > 0 {
		inflight := kcp.snd_nxt - kcp.snd_una
		kcp.ssthresh = inflight / 2
		if kcp.ssthresh < IKCP_THRESH_MIN {
			kcp.ssthresh = IKCP_THRESH_MIN
		}
		kcp.cwnd = kcp.ssthresh + resent
		kcp.incr = kcp.cwnd * kcp.mss
	}

	// congestion control, https://tools.ietf.org/html/rfc5681
	if lost > 0 {
		kcp.ssthresh = cwnd / 2
		if kcp.ssthresh < IKCP_THRESH_MIN {
			kcp.ssthresh = IKCP_THRESH_MIN
		}
		kcp.cwnd = 1
		kcp.incr = kcp.mss
	}

	if kcp.cwnd < 1 {
		kcp.cwnd = 1
		kcp.incr = kcp.mss
	}
}

flush函數很是的重要,kcp的重要參數都是在調節這個函數的行爲,這個函數只有一個參數ackOnly,意思就是隻發送ack,若是ackOnly爲true的話,該函數只遍歷ack列表,而後發送,就完事了。 若是不是,也會發送真實數據。 在發送數據前先進行windSize探測,若是開啓了擁塞控制nc=0,則每次發送前檢測服務端的winsize,若是服務端的winsize變小了,自身的winsize也要更着變小,來避免擁塞。若是沒有開啓擁塞控制,就按設置的winsize進行數據發送。
接着循環每一個段數據,並判斷每一個段數據的是否該重發,還有何時重發:

  1. 若是這個段數據首次發送,則直接發送數據。
  2. 若是這個段數據的當前時間大於它自身重發的時間,也就是RTO,則重傳消息。
  3. 若是這個段數據的ack丟失累計超過resent次數,則重傳,也就是快速重傳機制。這個resent參數由resend參數決定。
  4. 若是這個段數據的ack有丟失且沒有新的數據段,則觸發ER,ER相關信息ER

最後經過kcp.output發送消息hello,output是個回調函數,函數的實體是sess.go的:

func (s *UDPSession) output(buf []byte) {
	var ecc [][]byte

	// extend buf's header space
	ext := buf
	if s.headerSize > 0 {
		ext = s.ext[:s.headerSize+len(buf)]
		copy(ext[s.headerSize:], buf)
	}

	// FEC stage
	if s.fecEncoder != nil {
		ecc = s.fecEncoder.Encode(ext)
	}

	// encryption stage
	if s.block != nil {
		io.ReadFull(rand.Reader, ext[:nonceSize])
		checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
		binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
		s.block.Encrypt(ext, ext)

		if ecc != nil {
			for k := range ecc {
				io.ReadFull(rand.Reader, ecc[k][:nonceSize])
				checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
				binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
				s.block.Encrypt(ecc[k], ecc[k])
			}
		}
	}

	// WriteTo kernel
	nbytes := 0
	npkts := 0
	// if mrand.Intn(100) < 50 {
	for i := 0; i < s.dup+1; i++ {
		if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
			nbytes += n
			npkts++
		}
	}
	// }

	if ecc != nil {
		for k := range ecc {
			if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
				nbytes += n
				npkts++
			}
		}
	}
	atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
	atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
}

output函數纔是真正的將數據寫入內核中,在寫入以前先進行了fec編碼,fec編碼器的實現是用了一個開源庫github.com/klauspost/reedsolomon,編碼之後的hello就不是和原來的hello同樣了,至少多了幾個字節。 fec編碼器有兩個重要的參數reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1)),dataShardsparityShards,這兩個參數決定了fec的冗餘度,冗餘度越大抗丟包性就越強。

kcp的任務調度器

其實這裏任務調度器是一個很簡單的實現,用一個全局變量updater來管理session,代碼文件爲updater.go。其中最主要的函數

func (h *updateHeap) updateTask() {
	var timer <-chan time.Time
	for {
		select {
		case <-timer:
		case <-h.chWakeUp:
		}

		h.mu.Lock()
		hlen := h.Len()
		now := time.Now()
		if hlen > 0 && now.After(h.entries[0].ts) {
			for i := 0; i < hlen; i++ {
				entry := heap.Pop(h).(entry)
				if now.After(entry.ts) {
					entry.ts = now.Add(entry.s.update())
					heap.Push(h, entry)
				} else {
					heap.Push(h, entry)
					break
				}
			}
		}
		if hlen > 0 {
			timer = time.After(h.entries[0].ts.Sub(now))
		}
		h.mu.Unlock()
	}
}

任務調度器實現了一個堆結構,每當有新的鏈接,session都會插入到這個堆裏,接着for循環每隔interval時間,遍歷這個堆,獲得entry而後執行entry.s.update()。而entry.s.update()會執行s.kcp.flush(false)來發送數據。

總結

這裏簡單介紹了kcp的總體流程,詳細介紹了發送數據的流程,但未介紹kcp接收數據的流程,其實在客戶端發送數據後,服務端是須要返回ack的,而客戶端也須要根據返回的ack來判斷數據段是否須要重傳仍是在隊列裏清除該數據段。處理返回來的ack是在函數kcp.Input()函數實現的。具體詳細流程下次再介紹。

相關文章
相關標籤/搜索