張仕華redis
cmd/proxy/main.go文件數組
解析配置文件以後重點是proxy.New(config)函數session
該函數中,首先會建立一個Proxy結構體,以下:函數
type Proxy struct { mu sync.Mutex ... config *Config router *Router //Router中比較重要的是鏈接池和slots ... lproxy net.Listener //19000端口的Listener ladmin net.Listener //11080端口的Listener ... }
而後起兩個協程,分別處理11080和19000端口的請求oop
go s.serveAdmin() go s.serveProxy()
咱們重點看s.serveProxy()的處理流程,即redis client鏈接19000端口後proxy如何分發到codis server而且將結果返回到客戶端編碼
s.serverProxy也啓動了兩個協程,一個協程對router中鏈接池中的鏈接進行鏈接可用性檢測,另外一個協程是一個死循環,accept lproxy端口的鏈接,而且啓動一個新的Session進行處理,代碼流程以下:atom
go func(l net.Listener) (err error) { defer func() { eh <- err }() for { c, err := s.acceptConn(l)//accept鏈接 if err != nil { return err } NewSession(c, s.config).Start(s.router)//啓動一個新的session進行處理 } }(s.lproxy)//s爲proxy,s.lproxy即19000端口的監聽
首先介紹一下Request結構體,該結構體會貫穿整個流程spa
type Request struct { Multi []*redis.Resp //保存請求命令,按redis的resp協議類型將請求保存到Multi字段中 Batch *sync.WaitGroup //返回響應時,會在Batch處等待,r.Batch.Wait(),因此能夠作到當請求執行完成後纔會執行返回函數 Group *sync.WaitGroup Broken *atomic2.Bool OpStr string OpFlag Database int32 UnixNano int64 *redis.Resp //保存響應數據,也是redis的resp協議類型 Err error Coalesce func() error //聚合函數,適用於mget/mset等須要聚合響應的操做命令 }
Start函數處理流程以下:指針
tasks := NewRequestChanBuffer(1024)//tasks是一個指向RequestChan的指針,RequestChan結構體中有一個data字段,data字段是個數組,保存1024個指向Request的指針 go func() { s.loopWriter(tasks)//從RequestChan的data中取出請求而且返回給客戶端,若是是mget/mset這種須要聚合相應的請求,則會等待全部拆分的子請求執行完畢後執行聚合函數,而後將結果返回給客戶端 decrSessions() }() go func() { s.loopReader(tasks, d)//首先根據key計算該key分配到哪一個slot.在此步驟中只會將slot對應的鏈接取出,而後將請求放到鏈接的input字段中。 tasks.Close() }()
能夠看到,s.loopWriter只是從RequestChan的data字段中取出請求而且返回給客戶端,經過上文Request結構體的介紹,能夠看到,經過在request的Batch執行wait操做,只有請求處理完成後loopWriter纔會執行code
下邊咱們看loopReader的執行流程
... r := &Request{} //新建一個Request結構體,該結構體會貫穿請求的始終,請求字段,響應字段都放在Request中 r.Multi = multi r.Batch = &sync.WaitGroup{} r.Database = s.database r.UnixNano = start.UnixNano() if err := s.handleRequest(r, d); err != nil { //執行handleRequest函數,處理請求 r.Resp = redis.NewErrorf("ERR handle request, %s", err) tasks.PushBack(r) if breakOnFailure { return err } } else { tasks.PushBack(r) //若是handleRequest執行成功,將請求r放入tasks(即上文的RequestChan)的data字段中。loopWriter會從該字段中獲取請求而且返回給客戶端 } ...
看handleRequest函數如何處理請求,重點是router的dispatch函數
func (s *Router) dispatch(r *Request) error { hkey := getHashKey(r.Multi, r.OpStr)//hkey爲請求的key var id = Hash(hkey) % MaxSlotNum //hash請求的key以後對1024取模,獲取該key分配到哪一個slot slot := &s.slots[id] //slot都保存在router的slots數組中,獲取對應的slot return slot.forward(r, hkey)//執行slot的forward函數 }
forward函數調用process函數,返回一個BackendConn結構,而後調用其PushBack函數將請求放入bc.input中
func (d *forwardSync) Forward(s *Slot, r *Request, hkey []byte) error { s.lock.RLock() bc, err := d.process(s, r, hkey) //返回一個鏈接,而且將請求放入BackendConn的input中 s.lock.RUnlock() if err != nil { return err } bc.PushBack(r) return nil }
bc.PushBack(r)函數以下:
func (bc *BackendConn) PushBack(r *Request) { if r.Batch != nil { r.Batch.Add(1) //將請求的Batch執行add 1的操做,注意前文中的loopWriter會在Batch處等待 } bc.input <- r //將請求放入bc.input channel }
至此能夠看到,Proxy的處理流程
loopWriter->RuquestChan的data字段中讀取請求而且返回。在Batch處等待
loopReader->將請求放入RequestChan的data字段中,而且將請求放入bc.input channel中。在Batch處加1
很明顯,Proxy並無真正處理請求,確定會有goroutine從bc.input中讀取請求而且處理完成後在Batch處減1,這樣當請求執行完成後,loopWriter就能夠返回給客戶端端響應了。
從上文得知,proxy結構體中有一個router字段,類型爲Router,結構體類型以下:
type Router struct { mu sync.RWMutex pool struct { primary *sharedBackendConnPool //鏈接池 replica *sharedBackendConnPool } slots [MaxSlotNum]Slot //slot ... }
Router的pool中管理鏈接池,執行fillSlot時會真正生成鏈接,放入Slot結構體的backend字段的bc字段中,Slot結構體以下:
type Slot struct { id int ... backend, migrate struct { id int bc *sharedBackendConn } ... method forwardMethod }
咱們看一下bc字段的結構體sharedBackendConn:
type sharedBackendConn struct { addr string //codis server的地址 host []byte //codis server主機名 port []byte //codis server的端口 owner *sharedBackendConnPool //屬於哪一個鏈接池 conns [][]*BackendConn //二維數組,通常codis server會有16個db,第一個維度爲0-15的數組,每一個db能夠有多個BackendConn鏈接 single []*BackendConn //若是每一個db只有一個BackendConn鏈接,則直接放入single中。當每一個db有多個鏈接時會從conns中選一個返回,而每一個db只有一個鏈接時,直接從single中返回 refcnt int }
每一個BackendConn中有一個 input chan *Request字段,是一個channel,channel中的內容爲Request指針。也就是第二章節loopReader選取一個BackendConn後,會將請求放入input中。
下邊咱們看看處理BackendConn input字段中數據的協程是如何啓動並處理數據的。代碼路徑爲pkg/proxy/backend.go的newBackendConn函數
func NewBackendConn(addr string, database int, config *Config) *BackendConn { bc := &BackendConn{ addr: addr, config: config, database: database, } //1024長度的管道,存放1024個*Request bc.input = make(chan *Request, 1024) bc.retry.delay = &DelayExp2{ Min: 50, Max: 5000, Unit: time.Millisecond, } go bc.run() return bc }
能夠看到,在此處建立的BackendConn結構,而且初始化bc.input字段。鏈接池的創建是在proxy初始化啓動的時候就會創建好。繼續看bc.run()函數的處理流程
func (bc *BackendConn) run() { log.Warnf("backend conn [%p] to %s, db-%d start service", bc, bc.addr, bc.database) for round := 0; bc.closed.IsFalse(); round++ { log.Warnf("backend conn [%p] to %s, db-%d round-[%d]", bc, bc.addr, bc.database, round) if err := bc.loopWriter(round); err != nil { //執行loopWriter函數,此處的loopWriter和第二章節的loopWriter只是名稱相同,是兩個不一樣的處理函數 bc.delayBeforeRetry() } } log.Warnf("backend conn [%p] to %s, db-%d stop and exit", bc, bc.addr, bc.database) } func (bc *BackendConn) loopWriter(round int) (err error) { ... c, tasks, err := bc.newBackendReader(round, bc.config) //調用newBackendReader函數。注意此處的tasks也是一個存放*Request的channel,用來此處的loopWriter和loopReader交流信息 if err != nil { return err } ... for r := range bc.input { //能夠看到,此處的loopWriter會從bc.input中取出數據而且處理 ... if err := p.EncodeMultiBulk(r.Multi); err != nil { //將請求編碼而且發送到codis server return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err)) } if err := p.Flush(len(bc.input) == 0); err != nil { return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err)) } else { tasks <- r //將請求放入tasks這個channel中 } } return nil }
注意此處的loopWriter會從bc.input中取出數據發送到codis server,bc.newBackendReader會起一個loopReader,從codis server中讀取數據而且寫到request結構體中,此處的loopReader和loopWriter經過tasks這個channel通訊。
func (bc *BackendConn) newBackendReader(round int, config *Config) (*redis.Conn, chan<- *Request, error) { ... tasks := make(chan *Request, config.BackendMaxPipeline)//建立task這個channel而且返回給loopWriter go bc.loopReader(tasks, c, round)//啓動loopReader return c, tasks, nil } func (bc *BackendConn) loopReader(tasks <-chan *Request, c *redis.Conn, round int) (err error) { ... for r := range tasks { //從tasks中取出響應 resp, err := c.Decode() if err != nil { return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err)) } ... bc.setResponse(r, resp, nil)//設置響應數據到request結構體中 } return nil } func (bc *BackendConn) setResponse(r *Request, resp *redis.Resp, err error) error { r.Resp, r.Err = resp, err //Request的Resp字段設置爲響應值 if r.Group != nil { r.Group.Done() } if r.Batch != nil { r.Batch.Done() //注意此處會對Batch執行減1操做,這樣proxy中的loopWriter能夠聚合響應並返回 } return err }
總結一下,BackendConn中的函數功能以下
loopWriter->從bc.input中取出請求而且發給codis server,而且將請求放到tasks channel中
loopReader->從tasks中取出請求,設置codis server的響應字段到Request的Resp字段中,而且將Batch執行減1操做
一圖勝千言,圖片版權歸李老師,以下