libp2p-rs 關於監測指標的實現

模塊地址: https://github.com/netwarps/libp2p-rs/tree/master/swarm/src/metricsgit

libp2p-rs 做爲一個 p2p 網絡項目,有時候咱們可能須要觀察網絡數據的收發狀況,並對其進行收集和彙總。基於這個前提,設計了一個 metric 模塊去實現相關內容。github

metric實現構想

因爲 libp2p 支持鏈接多個 peer,而每一個 peer 支持的 protocol 類型也不盡相同。咱們不但須要彙總收發包的數據,同時也須要根據 peer_id 和 protocol,去分類記錄相應的網絡流量狀況。很明顯,這是一個 key-value 結構,天然會想到使用 HashMap 去存儲相關數據,可是 HashMap 不是一個線程安全的數據結構,那咱們就須要考慮實現一個支持多線程安全併發的 HashMap。安全

安全併發

在設計的初始,首先考慮到的就是使用 Arc 包裹 Mutex 的方式去保證線程安全,但因爲目前的使用場景是統計網絡收發包狀況,若是頻繁進行 lock 的操做,會致使性能極其低下。因而我參考了go-libp2p 的相關 metric 實現,Go 的底層是使用了一個 sync.Map 的結構,經過 Atomic+Mutex 保證了多線程併發安全。所以設計的邏輯就變成了,可否使用 CAS 之類的原子操做,實現一個 lock-free 的 HashMap。網絡

垃圾回收

除了線程安全,還有一種狀況也須要考慮。在Java和Go中,變量使用完後,GC會自動幫咱們執行釋放內存的操做。在 Rust 中,裸指針是指向內存地址的指針,只能經過手動釋放的方式去回收內存;同時,在手動回收的時候,還須要考慮是否有其餘線程正在經過裸指針使用某塊內存地址。而 AtomicPtr 的 compare_and_swap() 方法返回的剛好是一個可變的裸指針(即*mut T),這無疑是一個棘手的問題。數據結構

crossbeam-epoch

針對上述兩種狀況,咱們可使用 Crossbeam-Epoch 來解決遇到的問題。它提供了 Atomic 的相關原子操做和一個延遲刪除的功能。正如其名,epoch 使用世代和延遲隊列的方式,當 local epoch 與 global epoch 相差兩代時,表明能夠安全回收隊列中兩代前的內存地址,彌補了前文提到的裸指針釋放操做帶來的漏洞。crossbeam 經過 epoch 這個機制,保證了全部的對象只有在未被引用的狀況下才會被刪除,避免了出現野指針的狀況。多線程

MetricMap

MetricMap 做爲 Metric 的核心,內部實現是一個包裹了crossbeam_epoch::Atomic 的 HashMap。經過 crossbeam_epoch 提供的 pin(), load(),defer_destroy() 等一系列方法,實現了 lock-free 的 HashMap。閉包

MetricMap 的實現與 go-libp2p 中的 DeepCopyMap 類似,都是經過深拷貝的方式實現 map 結構的替換。Clone() 操做在 map 的數據量較大時,對性能的影響較爲明顯,後續考慮優化相關結構。併發

以 store_or_modify() 方法舉例:分佈式

  1. 首先使用 pin() 方法"pin"住當前 thread,防止全局 epoch 升級致使當前線程的 drop() 方法被調用;
  2. 而後起一個 loop,循環加載 Atomic 中的 HashMap;
  3. 對 HashMap 解引用,因爲在 rust 中解裸指針的引用是不安全的,所以須要用 unsafe 方法包裹;
  4. as_ref() 方法返回的是不可變引用,須要經過 clone() 獲得一份新的 HashMap。若是 key 值存在,經過向閉包傳值獲取新的返回值,更新 value;不然插入新的 key-value;
  5. 調用 Owned::new 爲新的 HashMap 分配一個在堆上的內存地址,執行 CAS 操做;
  6. 若是 CAS 成功,將舊的 HashMap 地址添加到待清除的列表中,這個列表就是前文提到的延遲刪除的隊列。ide

    /// If map contains key, replaces original value with the result that return by F.
    /// Otherwise, create a new key-value and insert.
    pub fn store_or_modify<F: Fn(&K, &V) -> V>(&self, key: &K, value: V, on_modify: F) {
        let guard = crossbeam_epoch::pin();
    
        loop {
            let shared = self.data.load(SeqCst, &guard);
    
            let mut new_hash = HashMap::new();
    
            match unsafe { shared.as_ref() } {
                Some(old_hash) => {
                    new_hash = old_hash.clone();
                    if let Some(old_value) = new_hash.get(key) {
                        let new_value = on_modify(key, old_value);
                        new_hash.insert(key.clone(), new_value.clone());
                    } else {
                        new_hash.insert(key.clone(), value.clone());
                    }
                }
                None => {
                    new_hash.insert(key.clone(), value.clone());
                }
            }
    
            let owned = Owned::new(new_hash);
    
            match self.data.compare_and_set(shared, owned, SeqCst, &guard) {
                Ok(_) => {
                    unsafe {
                        guard.defer_destroy(shared);
                        break;
                    }
                    // break;
                }
                Err(_e) => {}
            }
        }
    }

Metric

Metric 的主體實現以下,能夠看到與 peer 和 protocol 相關的數據結構都是基於 MetricMap 的。總數據包的個數和字節數大小不須要區分,因此直接使用 std 的 AtomicUize 便可:

pub struct Metric {
    /// The accumulative counter of packets sent.
    pkt_sent: AtomicUsize,
    /// The accumulative counter of packets received.
    pkt_recv: AtomicUsize,
    /// The accumulative counter of bytes sent.
    byte_sent: AtomicUsize,
    /// The accumulative counter of bytes received.
    byte_recv: AtomicUsize,

    /// A hashmap that key is protocol name and value is a counter of bytes received.
    protocol_in: MetricMap<ProtocolId, usize>,
    /// A hashmap that key is protocol name and value is a counter of bytes sent.
    protocol_out: MetricMap<ProtocolId, usize>,

    /// A hashmap that key is peer_id and value is a counter of bytes received.
    peer_in: MetricMap<PeerId, usize>,
    /// A hashmap that key is peer_id and value is a counter of bytes sent.
    peer_out: MetricMap<PeerId, usize>,
}

總結

以上是 Metric 相關結構從實現到完工,中間如有理解上的錯誤,還請各位不吝賜教。目前而言,MetricMap 的設計適合於一次新增屢次修改的狀況。後續考慮經過起一個 Web Server 的方式,經過 Restful API 的方式暴露相關監控數據,方便在外部查看。


Netwarps 由國內資深的雲計算和分佈式技術開發團隊組成,該團隊在金融、電力、通訊及互聯網行業有很是豐富的落地經驗。Netwarps 目前在深圳、北京均設立了研發中心,團隊規模30+,其中大部分爲具有十年以上開發經驗的技術人員,分別來自互聯網、金融、雲計算、區塊鏈以及科研機構等專業領域。Netwarps 專一於安全存儲技術產品的研發與應用,主要產品有去中心化文件系統(DFS)、去中心化計算平臺(DCP),致力於提供基於去中心化網絡技術實現的分佈式存儲和分佈式計算平臺,具備高可用、低功耗和低網絡的技術特色,適用於物聯網、工業互聯網等場景。公衆號:Netwarps

相關文章
相關標籤/搜索