【go-libp2p源碼剖析】Swarm撥號

1. 簡介

libp2p swarm 是用於 libp2p 網絡的「低級」接口,能夠更精細地控制系統的各個方面。swarm 能夠創建監聽,也能夠向其餘主機撥號創建新的鏈接(好比和某個主機創建 tcp 鏈接),而這裏所指的撥號其實就是創建出站鏈接的過程,它的實現邏輯較爲複雜,咱們在這裏作一個梳理。node

2. 代碼結構

倉庫地址:https://github.com/libp2p/go-libp2p-swarm.git
撥號相關代碼主要分佈在swarm_dial.go,limiter.go,dial_sync.go這三個文件,它們包含的結構體:
swarm_dial.go:DialBackoff,backoffAddr
DialBackoff 主要用於撥號失敗後再次撥號時間限制
dial_sync.go:DialSync、activeDial
DialSync 同步撥號幫助程序,同一時刻只有一個到指定 Peer 的撥號處於活躍狀態
limiter.go:dialLimiter、dialJob、dialResult
dialLimiter 主要對撥號併發數限制git

3.時序圖

在這裏插入圖片描述

經過上圖能夠看出撥號實際上是對併發撥號、同步、重試作了一系列檢查,最後再調用 Transport 進行撥號。假設有1000個 Peer,每一個 Peer 有5個不一樣的地址,若是一個一個同步撥勢必影響效率,因此須要啓動多個協程併發撥號,但也不能徹底不限制,dialLimiter 實現了對併發撥號的限制。若是一個地址撥號失敗,也不能當即就去再次嘗試撥號,這樣大機率會失敗,這裏須要等一段時間再去撥,不然就是浪費資源,因此等多久這裏有個算法,DialBackoff 實現了這些功能。那爲何還須要 DialSync?外部程序在調用 DialPeer 的時候,可能啓動了多個協程對同一個 Peer 進行併發撥號,由於沒法限制外面是怎麼調用方式,因此只有在撥號源頭進行限制(內部已經實現了併發撥號)。
這裏借用 swarm_dial.go 裏的一張圖,看看 DialSync 是如何工做的:github

Diagram of dial sync:

   many callers of Dial()   synched w.  dials many addrs       results to callers
  ----------------------\    dialsync    use earliest          /--------------
  -----------------------\             |----------\           /----------------
  ------------------------>------------<------------>---------<-----------------
  -----------------------|              \----x                 \----------------
  ----------------------|                \-----x                \---------------
                                         any may fail          if no addr at end retry dialAttempt x

3.調用入口

swarm 對外暴露了一個 DialPeer 方法,應用程序能夠直接經過它對 Peer 撥號,有兩處用到了它。算法

// DialPeer connects to a peer.
func (s *Swarm) DialPeer(ctx context.Context, p peer.ID) (network.Conn, error) {
    if s.gater != nil && !s.gater.InterceptPeerDial(p) {
        log.Debugf("gater disallowed outbound connection to peer %s", p.Pretty())
        return nil, &DialError{Peer: p, Cause: ErrGaterDisallowedConnection}
    }

    return s.dialPeer(ctx, p)
}

1.BasicHost 的 Connect 方法調用了 DialPeer 方法安全

func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
    // absorb addresses into peerstore
    h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)

    if h.Network().Connectedness(pi.ID) == network.Connected {
        return nil
    }

    resolved, err := h.resolveAddrs(ctx, h.Peerstore().PeerInfo(pi.ID))
    if err != nil {
        return err
    }
    h.Peerstore().AddAddrs(pi.ID, resolved, peerstore.TempAddrTTL)

    return h.dialPeer(ctx, pi.ID)
}

func (h *BasicHost) dialPeer(ctx context.Context, p peer.ID) error {
    log.Debugf("host %s dialing %s", h.ID(), p)
    c, err := h.Network().DialPeer(ctx, p)
    if err != nil {
        return err
    }
    select {
    case <-h.ids.IdentifyWait(c):
    case <-ctx.Done():
        return ctx.Err()
    }

    log.Debugf("host %s finished dialing %s", h.ID(), p)
    return nil
}

最後 IpfsDHT 調用了 BasicHost 的 Connect網絡

func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error {
    // short-circuit if we're already connected.
    if dht.host.Network().Connectedness(p) == network.Connected {
        return nil
    }

    logger.Debug("not connected. dialing.")
    routing.PublishQueryEvent(ctx, &routing.QueryEvent{
        Type: routing.DialingPeer,
        ID:   p,
    })

    pi := peer.AddrInfo{ID: p}
    if err := dht.host.Connect(ctx, pi); err != nil {
        logger.Debugf("error connecting: %s", err)
        routing.PublishQueryEvent(ctx, &routing.QueryEvent{
            Type:  routing.QueryError,
            Extra: err.Error(),
            ID:    p,
        })

        return err
    }
    logger.Debugf("connected. dial success.")
    return nil
}

2.另外 Swarm 的 NewStream 也調用了 dialPeer,若是尚未創建鏈接則先撥號併發

func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error) {
    log.Debugf("[%s] opening stream to peer [%s]", s.local, p)
    dials := 0
    for {
        c := s.bestConnToPeer(p)
        if c == nil {
            if nodial, _ := network.GetNoDial(ctx); nodial {
                return nil, network.ErrNoConn
            }

            if dials >= DialAttempts {
                return nil, errors.New("max dial attempts exceeded")
            }
            dials++

            var err error
            c, err = s.dialPeer(ctx, p)
            if err != nil {
                return nil, err
            }
        }
        s, err := c.NewStream()
        if err != nil {
            if c.conn.IsClosed() {
                continue
            }
            return nil, err
        }
        return s, nil
    }
}

4.撥號程序初始化

Swarm{
    ....
    // dialing helpers
    dsync   *DialSync
    backf   DialBackoff
    limiter *dialLimiter
}

func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc metrics.Reporter, extra ...interface{}) *Swarm {
    s := &Swarm{
        local: local,
        peers: peers,
        bwc:   bwc,
    }
    .....
    s.dsync = NewDialSync(s.doDial)
    s.limiter = newDialLimiter(s.dialAddr, s.IsFdConsumingAddr)
    s.proc = goprocessctx.WithContext(ctx)
    s.ctx = goprocessctx.OnClosingContext(s.proc)
    s.backf.init(s.ctx)

    return s
}

type DialFunc func(context.Context, peer.ID) (*Conn, error)

// NewDialSync constructs a new DialSync
func NewDialSync(dfn DialFunc) *DialSync {
    return &DialSync{
        dials:    make(map[peer.ID]*activeDial),
        dialFunc: dfn,
    }
}

type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (transport.CapableConn, error)
type isFdConsumingFnc func(ma.Multiaddr) bool

func newDialLimiter(df dialfunc, fdFnc isFdConsumingFnc) *dialLimiter {
    fd := ConcurrentFdDials
    if env := os.Getenv("LIBP2P_SWARM_FD_LIMIT"); env != "" {
        if n, err := strconv.ParseInt(env, 10, 32); err == nil {
            fd = int(n)
        }
    }
    return newDialLimiterWithParams(fdFnc, df, fd, DefaultPerPeerRateLimit)
}

func newDialLimiterWithParams(isFdConsumingFnc isFdConsumingFnc, df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
    return &dialLimiter{
        isFdConsumingFnc:   isFdConsumingFnc,
        fdLimit:            fdLimit,
        perPeerLimit:       perPeerLimit,
        waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
        activePerPeer:      make(map[peer.ID]int),
        dialFunc:           df,
    }
}

func (db *DialBackoff) init(ctx context.Context) {
    if db.entries == nil {
        db.entries = make(map[peer.ID]map[string]*backoffAddr)
    }
    go db.background(ctx)
}

在 NewSwarm 實例的時候對 DialBackoff、dialLimiter、DialBackoff 三個撥號幫助程序進行了初始化。
NewDialSync 須要傳入一個撥號函數作參數(實際調用 Swarm 的 doDial 函數)
newDialLimiter 則須要傳入兩個函數:一個爲撥號函數(實際調用 Swarm 的 dialAddr 函數),一個是判斷協議是否須要消耗 FD(UNIX/TCP)
DialBackoff 的 init 後臺還會啓動一個協程作 Backoff 清理工做app

5.涉及協程

一、針對每個 peer,在 DialSync 啓動了一個協程去撥號tcp

func (ad *activeDial) start(ctx context.Context) {
    ad.conn, ad.err = ad.ds.dialFunc(ctx, ad.id)

    // This isn't the user's context so we should fix the error.
    switch ad.err {
    case context.Canceled:
        // The dial was canceled with `CancelDial`.
        ad.err = errDialCanceled
    case context.DeadlineExceeded:
        // We hit an internal timeout, not a context timeout.
        ad.err = ErrDialTimeout
    }
    close(ad.waitch)
    ad.cancel()
}

func (ds *DialSync) getActiveDial(p peer.ID) *activeDial {
    ds.dialsLk.Lock()
    defer ds.dialsLk.Unlock()

    actd, ok := ds.dials[p]
    if !ok {
        adctx, cancel := context.WithCancel(context.Background())
        actd = &activeDial{
            id:     p,
            cancel: cancel,
            waitch: make(chan struct{}),
            ds:     ds,
        }
        ds.dials[p] = actd

        go actd.start(adctx)
    }

    // increase ref count before dropping dialsLk
    actd.incref()

    return actd
}

二、針對每個 Peer 的每一個地址,在 dialLimiter 啓動了一個協程去撥號分佈式

func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) {
    if dl.shouldConsumeFd(dj.addr) {
        if dl.fdConsuming >= dl.fdLimit {
            log.Debugf("[limiter] blocked dial waiting on FD token; peer: %s; addr: %s; consuming: %d; "+
                "limit: %d; waiting: %d", dj.peer, dj.addr, dl.fdConsuming, dl.fdLimit, len(dl.waitingOnFd))
            dl.waitingOnFd = append(dl.waitingOnFd, dj)
            return
        }

        log.Debugf("[limiter] taking FD token: peer: %s; addr: %s; prev consuming: %d",
            dj.peer, dj.addr, dl.fdConsuming)
        // take token
        dl.fdConsuming++
    }

    log.Debugf("[limiter] executing dial; peer: %s; addr: %s; FD consuming: %d; waiting: %d",
        dj.peer, dj.addr, dl.fdConsuming, len(dl.waitingOnFd))
    go dl.executeDial(dj)
}

func (dl *dialLimiter) addCheckPeerLimit(dj *dialJob) {
    if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
        log.Debugf("[limiter] blocked dial waiting on peer limit; peer: %s; addr: %s; active: %d; "+
            "peer limit: %d; waiting: %d", dj.peer, dj.addr, dl.activePerPeer[dj.peer], dl.perPeerLimit,
            len(dl.waitingOnPeerLimit[dj.peer]))
        wlist := dl.waitingOnPeerLimit[dj.peer]
        dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
        return
    }
    dl.activePerPeer[dj.peer]++

    dl.addCheckFdLimit(dj)
}

// executeDial calls the dialFunc, and reports the result through the response channel when finished. Once the response is sent it also releases all tokens it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
    defer dl.finishedDial(j)
    if j.cancelled() {
        return
    }

    dctx, cancel := context.WithTimeout(j.ctx, j.dialTimeout())
    defer cancel()

    con, err := dl.dialFunc(dctx, j.peer, j.addr)
    select {
    case j.resp <- dialResult{Conn: con, Addr: j.addr, Err: err}:
    case <-j.ctx.Done():
        if err == nil {
            con.Close()
        }
    }
}

三、Backoff 清理

func (db *DialBackoff) init(ctx context.Context) {
    if db.entries == nil {
        db.entries = make(map[peer.ID]map[string]*backoffAddr)
    }
    go db.background(ctx)
}

func (db *DialBackoff) background(ctx context.Context) {
    ticker := time.(BackoffMax)NewTicker
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            db.cleanup()
        }
    }
}

func (db *DialBackoff) cleanup() {
    db.lock.Lock()
    defer db.lock.Unlock()
    now := time.Now()
    for p, e := range db.entries {
        good := false
        for _, backoff := range e {
            backoffTime := BackoffBase + BackoffCoef*time.Duration(backoff.tries*backoff.tries)
            if backoffTime > BackoffMax {
                backoffTime = BackoffMax
            }
            if now.Before(backoff.until.Add(backoffTime)) {
                good = true
                break
            }
        }
        if !good {
            delete(db.entries, p)
        }
    }
}

6.一些重要規則和算法

一、撥號地址的過濾

// filterKnownUndialables takes a list of multiaddrs, and removes those that we definitely don't want to dial: addresses configured to be blocked, IPv6 link-local addresses, addresses without a dial-capable transport, and addresses that we know to be our own. This is an optimization to avoid wasting time on dials that we know are going to fail.
func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Multiaddr {
    lisAddrs, _ := s.InterfaceListenAddresses()
    var ourAddrs []ma.Multiaddr
    for _, addr := range lisAddrs {
        protos := addr.Protocols()
        // we're only sure about filtering out /ip4 and /ip6 addresses, so far
        if len(protos) == 2 && (protos[0].Code == ma.P_IP4 || protos[0].Code == ma.P_IP6) {
            ourAddrs = append(ourAddrs, addr)
        }
    }

    return addrutil.FilterAddrs(addrs,
        addrutil.SubtractFilter(ourAddrs...),
        s.canDial,
        // TODO: Consider allowing link-local addresses
        addrutil.AddrOverNonLocalIP,
        func(addr ma.Multiaddr) bool {
            return s.gater == nil || s.gater.InterceptAddrDial(p, addr)
        },
    )
}

// FilterAddrs is a filter that removes certain addresses, according to the given filters.
// If all filters return true, the address is kept.
func FilterAddrs(a []ma.Multiaddr, filters ...func(ma.Multiaddr) bool) []ma.Multiaddr {
    b := make([]ma.Multiaddr, 0, len(a))
    for _, addr := range a {
        good := true
        for _, filter := range filters {
            good = good && filter(addr)
        }
        if good {
            b = append(b, addr)
        }
    }
    return b
}

// AddrOverNonLocalIP returns whether the addr uses a non-local ip link
func AddrOverNonLocalIP(a ma.Multiaddr) bool {
    split := ma.Split(a)
    if len(split) < 1 {
        return false
    }
    if manet.IsIP6LinkLocal(split[0]) {
        return false
    }
    return true
}

二、撥號地址排序

// ranks addresses in descending order of preference for dialing   Private UDP > Public UDP > Private TCP > Public TCP > UDP Relay server > TCP Relay server
    rankAddrsFnc := func(addrs []ma.Multiaddr) []ma.Multiaddr {
        var localUdpAddrs []ma.Multiaddr // private udp
        var relayUdpAddrs []ma.Multiaddr // relay udp
        var othersUdp []ma.Multiaddr     // public udp

        var localFdAddrs []ma.Multiaddr // private fd consuming
        var relayFdAddrs []ma.Multiaddr //  relay fd consuming
        var othersFd []ma.Multiaddr     // public fd consuming

        for _, a := range addrs {
            if _, err := a.ValueForProtocol(ma.P_CIRCUIT); err == nil {
                if s.IsFdConsumingAddr(a) {
                    relayFdAddrs = append(relayFdAddrs, a)
                    continue
                }
                relayUdpAddrs = append(relayUdpAddrs, a)
            } else if manet.IsPrivateAddr(a) {
                if s.IsFdConsumingAddr(a) {
                    localFdAddrs = append(localFdAddrs, a)
                    continue
                }
                localUdpAddrs = append(localUdpAddrs, a)
            } else {
                if s.IsFdConsumingAddr(a) {
                    othersFd = append(othersFd, a)
                    continue
                }
                othersUdp = append(othersUdp, a)
            }
        }

        relays := append(relayUdpAddrs, relayFdAddrs...)
        fds := append(localFdAddrs, othersFd...)

        return append(append(append(localUdpAddrs, othersUdp...), fds...), relays...)
    }

三、Backoff 時間設定

// BackoffBase is the base amount of time to backoff (default: 5s).
var BackoffBase = time.Second * 5

// BackoffCoef is the backoff coefficient (default: 1s).
var BackoffCoef = time.Second

// BackoffMax is the maximum backoff time (default: 5m).
var BackoffMax = time.Minute * 5

// AddBackoff lets other nodes know that we've entered backoff with peer p, so dialers should not wait unnecessarily. We still will attempt to dial with one goroutine, in case we get through.
//
// Backoff is not exponential, it's quadratic and computed according to the following formula:
//
//     BackoffBase + BakoffCoef * PriorBackoffs^2
//
// Where PriorBackoffs is the number of previous backoffs.
func (db *DialBackoff) AddBackoff(p peer.ID, addr ma.Multiaddr) {
    saddr := string(addr.Bytes())
    db.lock.Lock()
    defer db.lock.Unlock()
    bp, ok := db.entries[p]
    if !ok {
        bp = make(map[string]*backoffAddr, 1)
        db.entries[p] = bp
    }
    ba, ok := bp[saddr]
    if !ok {
        bp[saddr] = &backoffAddr{
            tries: 1,
            until: time.Now().Add(BackoffBase),
        }
        return
    }

    backoffTime := BackoffBase + BackoffCoef*time.Duration(ba.tries*ba.tries)
    if backoffTime > BackoffMax {
        backoffTime = BackoffMax
    }
    ba.until = time.Now().Add(backoffTime)
    ba.tries++
}

7.核心撥號邏輯

func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (transport.CapableConn, *DialError) {
    /*
        This slice-to-chan code is temporary, the peerstore can currently provide
        a channel as an interface for receiving addresses, but more thought
        needs to be put into the execution. For now, this allows us to use
        the improved rate limiter, while maintaining the outward behaviour
        that we previously had (halting a dial when we run out of addrs)
    */
    var remoteAddrChan chan ma.Multiaddr
    if len(remoteAddrs) > 0 {
        remoteAddrChan = make(chan ma.Multiaddr, len(remoteAddrs))
        for i := range remoteAddrs {
            remoteAddrChan <- remoteAddrs[i]
        }
        close(remoteAddrChan)
    }

    log.Debugf("%s swarm dialing %s", s.local, p)

    ctx, cancel := context.WithCancel(ctx)
    defer cancel() // cancel work when we exit func

    // use a single response type instead of errs and conns, reduces complexity *a ton*
    respch := make(chan dialResult)
    err := &DialError{Peer: p}

    defer s.limiter.clearAllPeerDials(p)

    var active int
dialLoop:
    for remoteAddrChan != nil || active > 0 {
        // Check for context cancellations and/or responses first.
        select {
        case <-ctx.Done():
            break dialLoop
        case resp := <-respch:
            active--
            if resp.Err != nil {
                // Errors are normal, lots of dials will fail
                if resp.Err != context.Canceled {
                    s.backf.AddBackoff(p, resp.Addr)
                }

                log.Infof("got error on dial: %s", resp.Err)
                err.recordErr(resp.Addr, resp.Err)
            } else if resp.Conn != nil {
                return resp.Conn, nil
            }

            // We got a result, try again from the top.
            continue
        default:
        }

        // Now, attempt to dial.
        select {
        case addr, ok := <-remoteAddrChan:
            if !ok {
                remoteAddrChan = nil
                continue
            }

            s.limitedDial(ctx, p, addr, respch)
            active++
        case <-ctx.Done():
            break dialLoop
        case resp := <-respch:
            active--
            if resp.Err != nil {
                // Errors are normal, lots of dials will fail
                if resp.Err != context.Canceled {
                    s.backf.AddBackoff(p, resp.Addr)
                }

                log.Infof("got error on dial: %s", resp.Err)
                err.recordErr(resp.Addr, resp.Err)
            } else if resp.Conn != nil {
                return resp.Conn, nil
            }
        }
    }

    if ctxErr := ctx.Err(); ctxErr != nil {
        err.Cause = ctxErr
    } else if len(err.DialErrors) == 0 {
        err.Cause = network.ErrNoRemoteAddrs
    } else {
        err.Cause = ErrAllDialsFailed
    }
    return nil, err
}

一個 Peer 可能有多個地址,對能撥號的地址過濾後,再對這些地址排好序,扔到了這裏,先把這些地址放進 channel,再遍歷 channel 裏的地址(dialLoop):

STEP 1. 檢查上下文和響應
1.一、若是上下文取消,則跳出循環。
1.二、收到響應,並將 active 計數減1, 若是上個循環撥號出錯,則 AddBackoff,並記錄錯誤,若是上個循環撥號成功,則直接將 Conn 返回,二者都不是則繼續下一次循環。

STEP 2. 嘗試撥號
2.1 從 channel 中取出地址,調用 limitedDial 進行撥號(內部會啓動協程去撥號),並將 active 計數加1。
2.2 若是上下文取消,則跳出循環。
2.3 收到響應,並將 active 計數減1, 若是撥號出錯,則 AddBackoff,並記錄錯誤,若是撥號成功,則直接將 Conn 返回, 二者都不是則繼續下一次循環(下一次先檢查上下文和響應)。

STEP 3. 返回錯誤。
dialLoop 結束也沒有返回 Conn 則說明:上下文取消或沒有地址可撥 ,要否則就是撥號出錯,一個地址都沒撥成功。

假設這裏有三個地址 ,由於啓動了協程去撥號,第一個失敗了 ,第二個成功了,在等待第二個成功的響應時,第三個撥號任務可能已經執行了,這時返回 Conn 前,先會執行defer cancel() 第三個撥號任務會收到 cancel 信號,若是第三個撥號任務成功了 ,則將這個 Conn 關閉,詳見dialLimiter.executeDial。再執行和 defer s.limiter.clearAllPeerDials(p),這裏對waitingOnPeerLimit數據進行清理,無論撥號成功仍是失敗對這個 Peer 而言撥號已經結束了。

Netwarps 由國內資深的雲計算和分佈式技術開發團隊組成,該團隊在金融、電力、通訊及互聯網行業有很是豐富的落地經驗。Netwarps 目前在深圳、北京均設立了研發中心,團隊規模30+,其中大部分爲具有十年以上開發經驗的技術人員,分別來自互聯網、金融、雲計算、區塊鏈以及科研機構等專業領域。Netwarps 專一於安全存儲技術產品的研發與應用,主要產品有去中心化文件系統(DFS)、去中心化計算平臺(DCP),致力於提供基於去中心化網絡技術實現的分佈式存儲和分佈式計算平臺,具備高可用、低功耗和低網絡的技術特色,適用於物聯網、工業互聯網等場景。公衆號:Netwarps

相關文章
相關標籤/搜索