剝開比原看代碼03:比原是如何監聽p2p端口的

做者:freewindnode

比原項目倉庫:git

Github地址:https://github.com/Bytom/bytomgithub

Gitee地址:https://gitee.com/BytomBlockchain/bytomapi

咱們知道,在使用bytomd init --chain_id mainnet/testnet/solonet初始化比原的時候,它會根據給定的chain_id的不一樣,使用不一樣的端口(參看config/toml.go#L29):tcp

  1. mainnet(鏈接到主網): 46657
  2. testnet(鏈接到測試網): 46656
  3. solonet(本地單獨節點): 46658

對於我來講,因爲只須要對本地運行的一個比原節點進行分析,因此能夠採用第3個chain_id,即solonet。這樣它啓動以後,不會與其它的節點主動鏈接,能夠減小其它節點對於咱們的干擾。函數

因此在啓動的時候,個人命令是這樣的:測試

cd cmd/bytomd
./bytomd init --chain_id solonet
./bytomd node

它就會監聽46658端口,等待其它節點的鏈接。fetch

連上看看

若是這時咱們使用telnet來鏈接其46658端口,成功鏈接上以後,能夠看到它會發給咱們一些亂碼,大概以下:spa

$ telnet localhost 46658
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
ט�S��%�z?��_�端��݂���U[e

咱們也許會好奇,它發給咱們的究竟是什麼?設計

可是這個問題留待下次回答,由於首先,比原節點必須可以監聽這個端口,咱們才能連上。因此此次咱們的問題是:

比原在代碼中是如何監聽這個端口的?

端口已經寫在config.toml

在前面,當咱們使用./bytomd init --chain_id solonet初始化比原之後,比原會在本地的數據目錄中生成一個config.toml的配置文件,內容大約以下:

# This is a TOML config file.
# For more information, see https://github.com/toml-lang/toml
fast_sync = true
db_backend = "leveldb"
api_addr = "0.0.0.0:9888"
chain_id = "solonet"
[p2p]
laddr = "tcp://0.0.0.0:46658"
seeds = ""

其中[p2p]下面的laddr,就是該節點監聽的地址和端口。

對於laddr = "tcp://0.0.0.0:46658",它是意思是:

  1. 使用的是tcp協議
  2. 監聽的ip是0.0.0.0,是指監聽本機全部ip地址。這樣該節點既容許本地訪問,也容許外部主機訪問。若是你只想讓它監聽某一個ip,手動修改該配置文件便可
  3. 46658,就是咱們在這個問題中關注的端口了,它與該節點與其它節點交互數據使用的端口

比原在監聽這個端口的時候,並非如我最開始預期的直接調用net.Listen監聽它。實際的過程要比這個複雜,由於比原設計了一個叫Switch的對象,用來統一管理與外界相關的事件,包括監聽、鏈接、發送消息等。而Switch這個對象,又是在SyncManager中建立的。

啓動直到進入Switch

因此咱們首先須要知道,比原在源代碼中是如何啓動,而且一步步走進了Switch的世界。

首先仍是當咱們bytomd node啓動比原時,對應的入口函數以下:

cmd/bytomd/main.go#L54

func main() {
    cmd := cli.PrepareBaseCmd(commands.RootCmd, "TM", os.ExpandEnv(config.DefaultDataDir()))
    cmd.Execute()
}

它又會根據傳入的node參數,運行下面的函數:

cmd/bytomd/commands/run_node.go#L41

func runNode(cmd *cobra.Command, args []string) error {
    // Create & start node
    n := node.NewNode(config)
    // ...
}

咱們須要關注的是node.NewNode(config)函數,由於是在它裏面建立了SyncManager

node/node.go#L59

func NewNode(config *cfg.Config) *Node {
    // ...
    syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
    // ...
}

在建立SyncManager的時候,又建立了Switch:

netsync/handle.go#L42

func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
    // ...
    manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB)

    // ...
    protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
    manager.sw.AddReactor("PROTOCOL", protocolReactor)

    // Create & add listener
    p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
    l := p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP, nil)
    manager.sw.AddListener(l)

    // ...
}

這裏須要注意一下,上面建立的protocolReactor對象,是用來處理當有節點鏈接上端口後,雙方如何交互的事情。跟此次問題「監聽端口」沒有直接關係,可是這裏也能夠注意一下。

而後又建立了一個DefaultListener對象,而監聽端口的動做,就是在它裏面發生的。Listener建立以後,將會添加到manager.sw(即Switch)中,用於在那邊進行外界數據與事件的交互。

監聽端口

NewDefaultListener中作的事情比較多,因此咱們把它分紅幾塊說:

p2p/listener.go#L52

func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger tlog.Logger) Listener {
    // Local listen IP & port
    lAddrIP, lAddrPort := splitHostPort(lAddr)

    // Create listener
    var listener net.Listener
    var err error
    for i := 0; i < tryListenSeconds; i++ {
        listener, err = net.Listen(protocol, lAddr)
        if err == nil {
            break
        } else if i < tryListenSeconds-1 {
            time.Sleep(time.Second * 1)
        }
    }
    if err != nil {
        cmn.PanicCrisis(err)
    }

    // ...

上面這部分就是真正監聽的代碼了。經過Go語言提供的net.Listen函數,監聽了指定的地址。另外,在監聽的時候,進行了屢次嘗試,由於當一個剛剛被使用的端口被放開後,還須要一小段時間才能真正釋放,因此這裏須要多嘗試幾回。

其中tryListenSeconds是一個常量,值爲5,也就是說,大約會嘗試5秒鐘,要是都綁定不上,纔會真正失敗,拋出錯誤。

後面省略了一些代碼,主要是用來獲取當前監聽的實際ip以及外網ip,並記錄在日誌中。本想在這裏簡單講講,可是發現還有點麻煩,因此打算放在後面專開一個問題。

其實本次問題到這裏就已經結束了,由於已經完成了「監聽」。可是後面還有一些初始化操做,是爲了讓比原能夠跟鏈接上該端口的節點進行交互,也值得在這裏講講。

接着剛纔的方法,最後的部分是:

dl := &DefaultListener{
        listener:    listener,
        intAddr:     intAddr,
        extAddr:     extAddr,
        connections: make(chan net.Conn, numBufferedConnections),
    }
    dl.BaseService = *cmn.NewBaseService(logger, "DefaultListener", dl)
    dl.Start() // Started upon construction
    return dl
}

須要注意的是connections,它是一個帶有緩衝的channel(numBufferedConnections值爲10),用來存放鏈接上該端口的鏈接對象。這些操做將在後面的dl.Start()中執行。

dl.Start()將調用DefaultListener對應的OnStart方法,以下:

p2p/listener.go#L114

func (l *DefaultListener) OnStart() error {
    l.BaseService.OnStart()
    go l.listenRoutine()
    return nil
}

其中的l.listenRoutine,就是執行前面所說的向connections channel裏放入鏈接的函數:

p2p/listener.go#L126

func (l *DefaultListener) listenRoutine() {
    for {
        conn, err := l.listener.Accept()
        // ...
        l.connections <- conn
    }

    // Cleanup
    close(l.connections)

    // ...
}

SwitchSyncManager啓動的時候會被啓動,在它的OnStart方法中,會拿到全部Listener(即監聽端口的對象)中connectionschannel中的鏈接,與它們交互。

https://github.com/freewind/bytom-v1.0.1/blob/master/p2p/switch.go#L498

func (sw *Switch) listenerRoutine(l Listener) {
    for {
        inConn, ok := <-l.Connections()
        if !ok {
            break
        }
        // ...

        err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
        // ...
    }

其中sw.addPeerWithConnectionAndConfig就是與對應節點進行交互的邏輯所在,可是這已經超出了本次問題的範疇,下次再講。

到此爲止,本次的問題,應該已經講清楚了。

相關文章
相關標籤/搜索