rust 版本的 peerstore 落地實踐

模塊地址: https://github.com/netwarps/libp2p-rs/blob/master/core/src/peerstore.rsgit

在rust-libp2p中,當協議想要獲取peer_id所對應的地址時,須要實現NetworkBehaviour的addresses_of_peer方法。與之不一樣的是,go-libp2p使用peerstore來存儲了peer_id與address之間的關係,所以咱們能夠參考它來實現一個rust版本的peerstore。github

實現構想

首先,因爲咱們的啓動核心是swarm,那peerstore會做爲其中的一個屬性。其次,在go-libp2p-peerstore中,peerstore主要存放的數據爲三塊:地址信息的AddrBook,公私鑰信息的KeyBook,協議信息的ProtoBook;咱們能夠將三者結合在一塊兒組成一個新的struct,取名爲PeerRecord,放在以peer_id爲key的Hashmap中。json

垃圾回收機制

GC存在的意義就是防止hashmap過分膨脹。因爲網絡狀態時時刻刻都在發生變化,peer之間的鏈接也可能隨之變化,而peerstore有一個重要的做用就是存放peer的地址信息。若是不對已經失效的peer信息進行清理,就會影響到peerstore的工做效率。安全

目前實現的效果是,在swarm的start方法中使用task::spawn啓動一個任務,建立一個mpsc的channel,使用select語法等待管道傳來的消息或者 task 等待10分鐘的邏輯完成,針對某些地址,若是已經超出ttl的限制,清理當前地址;同時,若是當前peer_id的地址集合中不存在任何的地址信息,就將其從Hashmap中移除。網絡

Pinned

雖然GC機制的存在,使Hashmap不會無限制擴容,良好地幫助了系統的運行。可是仍然有些不足的地方,考慮以下這種狀況:併發

對於KAD協議來講,peer須要不停地進行迭代查詢已知節點,逐步填充本身的KBucket,這是一個耗時的過程。若是在gc時,將較早查詢到的節點地址信息從peerstore中移除了,那麼又須要從新啓用迭代查詢去獲取地址,所以咱們在PeerRecord中添加了一個bool值pinned。GC時會判斷這條記錄的類別,若是pinned爲true,就會跳過清理的步驟。async

序列化與持久化

對每個peer來講,由於某些緣由須要下線或停機時,存放在peerstore裏的節點信息是不該該被丟棄的。而對於須要持久化的數據,也須要進行序列化操做,便於存放。分佈式

在libp2p-rs中,主循環的調用也是經過task::spawn啓動的。當swarm接收到close的消息時,將會退出事件處理的循環,並向運行peerstore的gc線程發送一個close()的事件,結束gc的過程。接下來調用peerstore的save_data()方法,將數據使用serde序列化成json格式,並使用std::io將序列化後的數據存放到根目錄的txt文件中。ide

方法分析

以GC方法進行解析:oop

  1. swarm主循環spawn運行task,每十分鐘觸發一次select。
  2. Hashmap被Arc<Mutex>包裹,能夠經過lock()獲取,保證併發安全。
  3. 若是該peer的信息不是經過kad獲取的,調用retain篩選未超出ttl時限的地址。
  4. 若是當前peer的地址數據已經清空,從hashmap中移除這個peer。

    // swarm/lib.rs
    // The GC task is to remove all expired addresses from the peer store
        task::spawn(async move {
            log::info!("starting Peerstore GC...");
            loop {
                let either = future::select(rx.next(), task::sleep(PEERSTORE_GC_PURGE_INTERVAL).boxed()).await;
                match either {
                    Either::Left((_, _)) => break,
                    Either::Right((_, _)) => peer_store.remove_expired_addrs(),
                }
            }
            log::info!("quitting Peerstore GC...");
        });
    
    // core/peerstore.rs
    /// Removes all expired address.
    pub fn remove_expired_addrs(&self) {
        let mut to_remove = vec![];
        let mut guard = self.inner.lock().unwrap();
        for (peer, pr) in guard.iter_mut() {
            if !pr.pinned {
                log::debug!("GC attempt for {:?}", peer);
                pr.addrs.retain(|record| record.expiry.elapsed() < record.ttl);
                // delete this peer if no addr at all
                if pr.addrs.is_empty() {
                    log::debug!("remove {:?} from peerstore", peer);
                    to_remove.push(peer.clone());
                }
            }
        }
    
        for peer in to_remove {
            guard.remove(&peer);
        }
    }

Netwarps 由國內資深的雲計算和分佈式技術開發團隊組成,該團隊在金融、電力、通訊及互聯網行業有很是豐富的落地經驗。Netwarps 目前在深圳、北京均設立了研發中心,團隊規模30+,其中大部分爲具有十年以上開發經驗的技術人員,分別來自互聯網、金融、雲計算、區塊鏈以及科研機構等專業領域。Netwarps 專一於安全存儲技術產品的研發與應用,主要產品有去中心化文件系統(DFS)、去中心化計算平臺(DCP),致力於提供基於去中心化網絡技術實現的分佈式存儲和分佈式計算平臺,具備高可用、低功耗和低網絡的技術特色,適用於物聯網、工業互聯網等場景。公衆號:Netwarps

相關文章
相關標籤/搜索