【每日筆記】【Go學習筆記】2019-01-10 codis proxy處理流程

張仕華redis

proxy啓動

cmd/proxy/main.go文件數組

解析配置文件以後重點是proxy.New(config)函數session

該函數中,首先會建立一個Proxy結構體,以下:函數

type Proxy struct {
    mu sync.Mutex

    ...
    config *Config
    router *Router //Router中比較重要的是鏈接池和slots
    ...
    lproxy net.Listener //19000端口的Listener
    ladmin net.Listener //11080端口的Listener
    ...
}

而後起兩個協程,分別處理11080和19000端口的請求oop

go s.serveAdmin()
    go s.serveProxy()

咱們重點看s.serveProxy()的處理流程,即redis client鏈接19000端口後proxy如何分發到codis server而且將結果返回到客戶端編碼

Proxy處理

s.serverProxy也啓動了兩個協程,一個協程對router中鏈接池中的鏈接進行鏈接可用性檢測,另外一個協程是一個死循環,accept lproxy端口的鏈接,而且啓動一個新的Session進行處理,代碼流程以下:atom

go func(l net.Listener) (err error) {
        defer func() {
            eh <- err
        }()
        for {
            c, err := s.acceptConn(l)//accept鏈接
            if err != nil {
                return err
            }
            NewSession(c, s.config).Start(s.router)//啓動一個新的session進行處理
        }
    }(s.lproxy)//s爲proxy,s.lproxy即19000端口的監聽

首先介紹一下Request結構體,該結構體會貫穿整個流程spa

type Request struct {
    Multi []*redis.Resp  //保存請求命令,按redis的resp協議類型將請求保存到Multi字段中
    Batch *sync.WaitGroup //返回響應時,會在Batch處等待,r.Batch.Wait(),因此能夠作到當請求執行完成後纔會執行返回函數

    Group *sync.WaitGroup

    Broken *atomic2.Bool

    OpStr string
    OpFlag

    Database int32
    UnixNano int64

    *redis.Resp //保存響應數據,也是redis的resp協議類型
    Err error

    Coalesce func() error //聚合函數,適用於mget/mset等須要聚合響應的操做命令
}

Start函數處理流程以下:指針

tasks := NewRequestChanBuffer(1024)//tasks是一個指向RequestChan的指針,RequestChan結構體中有一個data字段,data字段是個數組,保存1024個指向Request的指針

        go func() {
            s.loopWriter(tasks)//從RequestChan的data中取出請求而且返回給客戶端,若是是mget/mset這種須要聚合相應的請求,則會等待全部拆分的子請求執行完畢後執行聚合函數,而後將結果返回給客戶端
            decrSessions()
        }()

        go func() {
            s.loopReader(tasks, d)//首先根據key計算該key分配到哪一個slot.在此步驟中只會將slot對應的鏈接取出,而後將請求放到鏈接的input字段中。
            tasks.Close()
        }()

能夠看到,s.loopWriter只是從RequestChan的data字段中取出請求而且返回給客戶端,經過上文Request結構體的介紹,能夠看到,經過在request的Batch執行wait操做,只有請求處理完成後loopWriter纔會執行code

下邊咱們看loopReader的執行流程

...
          r := &Request{}   //新建一個Request結構體,該結構體會貫穿請求的始終,請求字段,響應字段都放在Request中
        r.Multi = multi
        r.Batch = &sync.WaitGroup{}
        r.Database = s.database
        r.UnixNano = start.UnixNano()

        if err := s.handleRequest(r, d); err != nil {  //執行handleRequest函數,處理請求
            r.Resp = redis.NewErrorf("ERR handle request, %s", err) 
            tasks.PushBack(r)
            if breakOnFailure {
                return err
            }
        } else {
            tasks.PushBack(r) //若是handleRequest執行成功,將請求r放入tasks(即上文的RequestChan)的data字段中。loopWriter會從該字段中獲取請求而且返回給客戶端
        }
...

看handleRequest函數如何處理請求,重點是router的dispatch函數

func (s *Router) dispatch(r *Request) error {
    hkey := getHashKey(r.Multi, r.OpStr)//hkey爲請求的key
    var id = Hash(hkey) % MaxSlotNum //hash請求的key以後對1024取模,獲取該key分配到哪一個slot
    slot := &s.slots[id] //slot都保存在router的slots數組中,獲取對應的slot
    return slot.forward(r, hkey)//執行slot的forward函數
}

forward函數調用process函數,返回一個BackendConn結構,而後調用其PushBack函數將請求放入bc.input中

func (d *forwardSync) Forward(s *Slot, r *Request, hkey []byte) error {
    s.lock.RLock()
    bc, err := d.process(s, r, hkey) //返回一個鏈接,而且將請求放入BackendConn的input中
    s.lock.RUnlock()
    if err != nil {
        return err
    }
    bc.PushBack(r)
    return nil
}

bc.PushBack(r)函數以下:

func (bc *BackendConn) PushBack(r *Request) {
    if r.Batch != nil {
        r.Batch.Add(1) //將請求的Batch執行add 1的操做,注意前文中的loopWriter會在Batch處等待
    }
    bc.input <- r //將請求放入bc.input channel
}

至此能夠看到,Proxy的處理流程

loopWriter->RuquestChan的data字段中讀取請求而且返回。在Batch處等待

loopReader->將請求放入RequestChan的data字段中,而且將請求放入bc.input channel中。在Batch處加1

很明顯,Proxy並無真正處理請求,確定會有goroutine從bc.input中讀取請求而且處理完成後在Batch處減1,這樣當請求執行完成後,loopWriter就能夠返回給客戶端端響應了。

BackendConn的處理流程

從上文得知,proxy結構體中有一個router字段,類型爲Router,結構體類型以下:

type Router struct {
    mu sync.RWMutex
    pool struct {
        primary *sharedBackendConnPool //鏈接池
        replica *sharedBackendConnPool
    }
    slots [MaxSlotNum]Slot //slot
    ...
}

Router的pool中管理鏈接池,執行fillSlot時會真正生成鏈接,放入Slot結構體的backend字段的bc字段中,Slot結構體以下:

type Slot struct {
    id   int
    ...
    backend, migrate struct {
        id int
        bc *sharedBackendConn
    }
    ...
    method forwardMethod
}

咱們看一下bc字段的結構體sharedBackendConn:

type sharedBackendConn struct {
    addr string //codis server的地址
    host []byte //codis server主機名
    port []byte //codis server的端口

    owner *sharedBackendConnPool //屬於哪一個鏈接池
    conns [][]*BackendConn //二維數組,通常codis server會有16個db,第一個維度爲0-15的數組,每一個db能夠有多個BackendConn鏈接

    single []*BackendConn //若是每一個db只有一個BackendConn鏈接,則直接放入single中。當每一個db有多個鏈接時會從conns中選一個返回,而每一個db只有一個鏈接時,直接從single中返回

    refcnt int
}

每一個BackendConn中有一個 input chan *Request字段,是一個channel,channel中的內容爲Request指針。也就是第二章節loopReader選取一個BackendConn後,會將請求放入input中。

下邊咱們看看處理BackendConn input字段中數據的協程是如何啓動並處理數據的。代碼路徑爲pkg/proxy/backend.go的newBackendConn函數

func NewBackendConn(addr string, database int, config *Config) *BackendConn {
    bc := &BackendConn{
        addr: addr, config: config, database: database,
    }
    //1024長度的管道,存放1024個*Request
    bc.input = make(chan *Request, 1024)
    bc.retry.delay = &DelayExp2{
        Min: 50, Max: 5000,
        Unit: time.Millisecond,
    }

    go bc.run()

    return bc
}

能夠看到,在此處建立的BackendConn結構,而且初始化bc.input字段。鏈接池的創建是在proxy初始化啓動的時候就會創建好。繼續看bc.run()函數的處理流程

func (bc *BackendConn) run() {
    log.Warnf("backend conn [%p] to %s, db-%d start service",
        bc, bc.addr, bc.database)
    for round := 0; bc.closed.IsFalse(); round++ {
        log.Warnf("backend conn [%p] to %s, db-%d round-[%d]",
            bc, bc.addr, bc.database, round)
        if err := bc.loopWriter(round); err != nil { //執行loopWriter函數,此處的loopWriter和第二章節的loopWriter只是名稱相同,是兩個不一樣的處理函數
            bc.delayBeforeRetry()
        }
    }
    log.Warnf("backend conn [%p] to %s, db-%d stop and exit",
        bc, bc.addr, bc.database)
}
 
func (bc *BackendConn) loopWriter(round int) (err error) {
    ...
    c, tasks, err := bc.newBackendReader(round, bc.config) //調用newBackendReader函數。注意此處的tasks也是一個存放*Request的channel,用來此處的loopWriter和loopReader交流信息
    if err != nil {
        return err
    }
    ...

    for r := range bc.input { //能夠看到,此處的loopWriter會從bc.input中取出數據而且處理
        ...
        if err := p.EncodeMultiBulk(r.Multi); err != nil { //將請求編碼而且發送到codis server
            return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
        }
        if err := p.Flush(len(bc.input) == 0); err != nil {
            return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
        } else {
            tasks <- r  //將請求放入tasks這個channel中
        }
    }
    return nil
}

注意此處的loopWriter會從bc.input中取出數據發送到codis server,bc.newBackendReader會起一個loopReader,從codis server中讀取數據而且寫到request結構體中,此處的loopReader和loopWriter經過tasks這個channel通訊。

func (bc *BackendConn) newBackendReader(round int, config *Config) (*redis.Conn, chan<- *Request, error) {
    ...
    tasks := make(chan *Request, config.BackendMaxPipeline)//建立task這個channel而且返回給loopWriter
    go bc.loopReader(tasks, c, round)//啓動loopReader

    return c, tasks, nil
}
func (bc *BackendConn) loopReader(tasks <-chan *Request, c *redis.Conn, round int) (err error) {
       ...
    for r := range tasks {  //從tasks中取出響應
        resp, err := c.Decode()
        if err != nil {
            return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
        }
        ...
        bc.setResponse(r, resp, nil)//設置響應數據到request結構體中
    }
    return nil
}

func (bc *BackendConn) setResponse(r *Request, resp *redis.Resp, err error) error {
    r.Resp, r.Err = resp, err //Request的Resp字段設置爲響應值
    if r.Group != nil {
        r.Group.Done()
    }
    if r.Batch != nil {
        r.Batch.Done() //注意此處會對Batch執行減1操做,這樣proxy中的loopWriter能夠聚合響應並返回
    }
    return err
}

總結一下,BackendConn中的函數功能以下

loopWriter->從bc.input中取出請求而且發給codis server,而且將請求放到tasks channel中

loopReader->從tasks中取出請求,設置codis server的響應字段到Request的Resp字段中,而且將Batch執行減1操做

小結

一圖勝千言,圖片版權歸李老師,以下

clipboard.png

相關文章
相關標籤/搜索