libp2p-rs swarm 撥號設計與實現

前面咱們對go-libp2p中swarm撥號源碼進行了分析(【go-libp2p源碼剖析】Swarm撥號),參考go-libp2p,咱們在libp2p-rs上完成swarm撥號功能的開發。功能基本上和go-libp2p保持一致,稍微作了精簡,去掉了go-libp2p撥號的部分功能,如DialSync中的同步撥號限制。下面對libp2p-rs swarm撥號功能的實現作一個詳細的說明。git

代碼組織結構

倉庫地址:https://github.com/netwarps/libp2p-rs.git
撥號相關代碼主要分佈在swarm/src/lib.rsswarm/src/dial.rs兩個文件中github

類圖以下:
在這裏插入圖片描述安全

  • 撥號實現主要圍繞AsyncDialer展開,它組合了DialLimiter和Backoff的功能,AsyncDialer實現了撥號的重試,撥號任務的啓動及撥號結果的收集和反饋。撥號默認不重試,能夠經過修改環境變量LIBP2P_SWARM_DIAL_ATTEMPTS對重試次數作修改。
  • DialParam 包裝了多個撥號須要的參數,在AsyncDialer的方法之間傳遞
  • Transports能夠根據撥號地址匹配合適的Transport去撥號(好比是TCP仍是WebSocket)
  • DialBackoff 對Peer的撥號失敗的地址作了標記,避免頻繁撥號
  • DialLimiter 對併發撥號數作了限制,默認100,也能夠經過修改環境變量LIBP2P_SWARM_DIAL_LIMIT對併發撥號數作修改

    工做流程

    時序圖以下:
    在這裏插入圖片描述網絡

    1. 經過control發送一個命令給swarm,能夠調用new_connection建立一個新的鏈接,再建立stream,也能夠直接調用open_stream建立stream。Swarm接收到命令後,調用on_new_connection或on_new_stream。在on_new_stream中若是connection存在直接拿出connection建立stream,若是不存在則去撥號建立一個新的connection再建立stream。最後調用dial_peer對peer進行撥號,在這裏會將撥號要用到的參數從Swarm複製到DialParam。

注:咱們撥號沒有將connection直接返回(由於只有在open_stream時纔用到了connection,若是將值返回顯得有點多餘,返回可變引用又會有生命週期相關問題)。這裏會構造一個閉包(主要用來打開流並返回流),最終在ConnectionEstablished事件或OutgoingConnectionError事件處理函數中執行這個閉包。
因爲撥號須要啓動多個task,若是一路傳遞下去的話,閉包須要支持clone才行,閉包捕獲了外部的oneshot::Sender,它不支持clone,因此爲求方便咱們將閉包暫存在Swarm裏的dial_transactions中,它是一個hashmap數據結構,key值是每次操做生成的惟一值,咱們命名爲TransactionId。這個TransactionId最終會帶到ConnectionEstablished事件或OutgoingConnectionError事件對應的處理函數,最後咱們能夠根據TransactionId將閉包remove出來執行。數據結構

部分代碼片斷閉包

type DialCallback = Box<dyn FnOnce(Result<&mut Connection>) + Send>;
fn on_new_stream(&mut self, peer_id: PeerId, pids: Vec<ProtocolId>, reply: oneshot::Sender<Result<Substream>>) -> Result<()> {
        if let Some(connection) = self.get_best_conn(&peer_id) {
            ......
        } else {
            // dialing peer, and opening a new stream in the post-processing callback
            self.dial_peer(peer_id.clone(), |r: Result<&mut Connection>| {
                match r {
                    Ok(connection) => {
                        connection.open_stream(pids, |r| {
                            let _ = reply.send(r.map_err(|e| e.into()));
                        });
                    }
                    Err(e) => {
                        let _ = reply.send(Err(e));
                    }
                }
            });
        }
        Ok(())
    }
fn dial_peer<F: FnOnce(Result<&mut Connection>) + Send + 'static>(&mut self, peer_id: PeerId, f: F) {
        ......

           // allocate transaction id and push box::f into hashmap for post-processing
        let tid = self.assign_tid();
        self.dial_transactions.insert(tid, Box::new(f));
        self.dialer
            .dial(peer_id, self.transports.clone(), addrs, self.event_sender.clone(), tid);
    }
fn handle_connection_opened(&mut self, stream_muxer: IStreamMuxer, dir: Direction, tid: Option<TransactionId>) -> Result<()> {
 ......
    // dial callback for post-processing
        // note that it must cleanup the tid entry
        if let Some(id) = tid {
            // the entry must be there
            let callback = self.dial_transactions.remove(&id).expect("no match tid found");
            callback(Ok(&mut connection));
        }
   ......
 }
  1. Swarm撥號時會調用AsyncDialer的dial方法。這裏首先啓動一個新的task,再調用start_dialing方法。start_dialing方法實現了對撥號的重試功能,它會等待撥號結果,將撥號結果返回給dial,成功則發送ConnectionEstablished事件,失敗則發送OutgoingConnectionError事件,在事件處理函數中會直接直接第一步傳入的閉包。併發

    pub(crate) fn dial(
        &self,
        peer_id: PeerId,
        transports: Transports,
        addrs: EitherDialAddr,
        mut event_sender: mpsc::UnboundedSender<SwarmEvent>,
        tid: TransactionId,
    ) {
        let dial_param = DialParam {
            transports,
            addrs,
            peer_id,
            tid,
            limiter: self.limiter.clone(),
            backoff: self.backoff.clone(),
            attempts: self.attempts,
        };
    
        task::spawn(async move {
            let tid = dial_param.tid;
            let peer_id = dial_param.peer_id.clone();
    
            let r = AsyncDialer::start_dialing(dial_param).await;
            match r {
                Ok(stream_muxer) => {
                    let _ = event_sender
                        .send(SwarmEvent::ConnectionEstablished {
                            stream_muxer,
                            direction: Direction::Outbound,
                            tid: Some(tid),
                        })
                        .await;
                }
                Err(err) => {
                    let _ = event_sender
                        .send(SwarmEvent::OutgoingConnectionError { tid, peer_id, error: err })
                        .await;
                }
            }
        });
    }
    async fn start_dialing(dial_param: DialParam) -> Result<IStreamMuxer> {
        let mut dial_count: u32 = 0;
        loop {
            dial_count += 1;
    
            let active_param = dial_param.clone();
            let r = AsyncDialer::dial_addrs(active_param).await;
            if let Err(e) = r {
                log::info!("[Dialer] dialer failed at attempt={} error={:?}", dial_count, e);
                if dial_count < dial_param.attempts {
                    log::info!(
                        "[Dialer] All addresses of {:?} cannot be dialed successfully. Now try dialing again, attempts={}",
                        dial_param.peer_id,
                        dial_count
                    );
                    //TODO:
                    task::sleep(BACKOFF_BASE).await;
                } else if dial_param.attempts > 1 {
                    break Err(SwarmError::MaxDialAttempts(dial_param.attempts));
                } else {
                    break Err(e);
                }
            } else {
                break r;
            }
        }
    }
  2. start內部調用了dial_addrs,即對peer的多個地址同時進行撥號。首先檢查backoff,若是剛撥號失敗過,則直接返回錯誤。而後針對每一個地址構造一個DialJob,每一個DialJob啓動一個task調用limiter的do_dial_job作撥號檢查和撥號操做,由於不知道task啥時候能撥號完成,這裏傳了一個channel tx進去,只要撥號完成就會發回一個消息,再在外面接收,啓動幾個task就接收幾回channel rx的消息,一旦發現有成功的撥號,就將結果直接返回。那些後面再撥號成功的,咱們不關心,讓它們自動銷燬;對那些撥號失敗的添加backoff,避免對失敗地址頻繁撥號。app

    let (tx, rx) = mpsc::unbounded::<(Result<IStreamMuxer>, Multiaddr)>();
        let mut num_jobs = 0;
        for addr in addrs_rank {
            // first of all, check the transport
            let r = param.transports.lookup_by_addr(addr.clone());
            if r.is_err() {
                log::info!("[Dialer] no transport found for {:?}", addr);
                continue;
            }
    
            num_jobs += 1;
    
            let dj = DialJob {
                addr,
                peer: peer_id.clone(),
                tx: tx.clone(),
                transport: r.unwrap(),
            };
            // spawn a task to dial
            let limiter = self.limiter.clone();
            task::spawn(async move {
                limiter.do_dial_job(dj).await;
            });
        }
         log::trace!("total {} dialing jobs started, collecting...", num_jobs);
        self.collect_dialing_result(rx, num_jobs, param).await
async fn collect_dialing_result(&self, mut rx: UnboundedReceiver<(Result<IStreamMuxer>, Multiaddr)>, jobs: u32, param: DialParam) -> Result<IStreamMuxer> {
        for i in 0..jobs {
            let peer_id = param.peer_id.clone();
            log::trace!("[Dialer] receiving dial result, finished jobs={} ...", i);
            let r = rx.next().await;
            match r {
                Some((Ok(stream_muxer), addr)) => {
                    let reported_pid = stream_muxer.remote_peer();
                    if peer_id == reported_pid {
                        return Ok(stream_muxer);
                    } else {
                        self.backoff.add_peer(peer_id, addr).await;
                    }
                }
                Some((Err(err), addr)) => {
                    if let SwarmError::Transport(_) = err {
                        self.backoff.add_peer(peer_id, addr).await;
                    }
                }
                None => {
                    log::warn!("[Dialer] should not happen");
                }
            }
        }
        return Err(SwarmError::AllDialsFailed);
    }
  1. 相對go的實現DialLimiter作了精簡,去掉了等待列表,失敗的咱們不會放到waiting列表裏作撥號,而是直接返回錯誤。AsyncDialer的dial_addrs會調用do_dial_job。do_dial_job中會判斷當前正在撥號的數量,若是數量超過咱們的限制,則直接返回ConcurrentDialLimit錯誤。不然給併發數加1,並調用execute_dial作實際的撥號操做,撥號完成併發數減1。這裏對transport的撥號加了一個超時的封裝(本地地址默認5秒超時,外部地址默認60s超時),若是超時則直接返回DialTimeout錯誤。無論撥號成功與否都經過channel將消息送回給AsyncDialer。async

    async fn do_dial_job(&self, mut dj: DialJob) {
        if self.dial_consuming.load(Ordering::SeqCst) >= self.dial_limit {
            let _ = dj.tx.send((Err(SwarmError::ConcurrentDialLimit(self.dial_limit)), dj.addr)).await;
            return;
        }
        self.dial_consuming.fetch_add(1, Ordering::SeqCst);
        self.execute_dial(dj).await;
    }
    fn dial_timeout(&self, ma: &Multiaddr) -> Duration {
        let mut timeout: Duration = DIAL_TIMEOUT;
        if ma.is_private_addr() {
            timeout = DIAL_TIMEOUT_LOCAL;
        }
        timeout
    }
    async fn execute_dial(&self, mut dj: DialJob) {
        let timeout = self.dial_timeout(&dj.addr);
    
        let dial_r = future::timeout(timeout, dj.transport.dial(dj.addr.clone())).await;
        if let Ok(r) = dial_r {
            let _ = dj.tx.send((r.map_err(|e|e.into()), dj.addr)).await;
        } else {
            let _ = dj.tx.send((Err(SwarmError::DialTimeout(dj.addr.clone(), timeout.as_secs())), dj.addr)).await;
        }
        self.dial_consuming.fetch_sub(1, Ordering::SeqCst);
    }

Netwarps 由國內資深的雲計算和分佈式技術開發團隊組成,該團隊在金融、電力、通訊及互聯網行業有很是豐富的落地經驗。Netwarps 目前在深圳、北京均設立了研發中心,團隊規模30+,其中大部分爲具有十年以上開發經驗的技術人員,分別來自互聯網、金融、雲計算、區塊鏈以及科研機構等專業領域。
Netwarps 專一於安全存儲技術產品的研發與應用,主要產品有去中心化文件系統(DFS)、去中心化計算平臺(DCP),致力於提供基於去中心化網絡技術實現的分佈式存儲和分佈式計算平臺,具備高可用、低功耗和低網絡的技術特色,適用於物聯網、工業互聯網等場景。
公衆號:Netwarps分佈式

相關文章
相關標籤/搜索