異步代碼的幾種寫法 | Rust學習筆記

做者:謝敬偉,江湖人稱「刀哥」,20年IT老兵,數據通訊網絡專家,電信網絡架構師,目前任Netwarps開發總監。刀哥在操做系統、網絡編程、高併發、高吞吐、高可用性等領域有多年的實踐經驗,並對網絡及編程等方面的新技術有濃厚的興趣。

Rust歷史不長,仍然處於快速發展的歷程中。關於異步編程的模式,如今已經發展到async/await協程的高級階段。大概是由於async/await出現的時間還不長,因此現有大多數的開源項目並非或不是純粹使用async/await來書寫的,而是前先後後有多種的寫法。這樣的情況給Rust的學習帶來了一些的難度。在這裏,咱們來捋一捋異步代碼的幾種寫法。git

mio

最原始的方式是使用mio進行開發。mio是一個底層異步I/O庫,提供非阻塞方式的API,具備很高的性能。實際上mio是對於操做系統epoll/kqueue/IOCP的封裝。在C/C++中咱們使用libevent之類的庫,mio能夠理解爲對應的Rust版本。基於mio的代碼大體以下:程序員

loop {
     // Poll Mio for events, blocking until we get an event.
     poll.poll(&mut events, None)?;

     // Process each event.
     for event in events.iter() {
         if event.is_writable() {
             // socket可寫,開始發送數據
         }

         if event.is_readable() {
             // socket可讀,開始接收數據
         }
         // socket 關閉,退出循環
         return Ok(());
     }
 }

總的來講,這是徹底基於異步事件通知的寫法,和C/C++區別不是很大,異步代碼對於程序員是一個挑戰,當代碼邏輯愈來愈複雜,添加新功能或是解決已有問題的難度也愈來愈大。編程

另外,mio實現的是一個單線程事件循環,雖然能夠處理成千上萬路的I/O操做,但沒有多線程的能力,須要本身擴充。安全

Future Poll

爲了更好地規範異步的邏輯,Rust抽象出Future表示還沒有發生的事物。這些Future能夠用不少方式組合成一個更復雜的複合Future來表明一系列的事件。Future須要程序主動去poll(輪詢)才能獲取到最終的結果,每一次輪詢的結果多是Ready或者Pending微信

運行庫提供ExecutorReactor來執行Future,也就是調用Futurepoll方法循環執行一系列就緒的Future,當Future返回Pending的時候,會將Future轉移到Reactor上等待喚醒。Reactor被用來負責喚醒以前沒法完成的Future。事實上,tokioReactor是基於mio實現的,而async-std/smol則是封裝了epoll/kqueue/IOCP,提供相似的功能。網絡

手動實現Future是一件相對繁瑣的工做,主要的問題在於異步模式自己的特性。例如,接收網絡數據,沒法臆測每次輪詢會收到多少字節的數據,每每須要開闢一段接收緩衝區容納數據,協議解碼也須要一個狀態機拼包向上層提交;發送網絡數據存在類似問題,發送數據時底層未就緒,則緩衝發送數據,待下次輪詢時,須要首先檢查並處理髮送緩衝區。另外還有一些值得注意的地方,若是手動實現的Future返回Pending,則必須本身實現喚醒機制,也就是須要將cx克隆一份記下來,而後在適當的時侯調用cx.wake()。由於網絡相關的功能每每是分層的,所以手動的Poll循環也會是層層堆疊的,這時候,返回值Poll::Ready(T)就有學問了。泛型T可能包裹各類不一樣的數據,Option<T>Result<T,E>,或者二者的組合。由於最外層還有一個Poll<T>,全部這時候的match語句寫起來會很是臃腫,粘貼複製寫不少代碼,完成的功能卻很是有限,並且因爲這些代碼很類似,大大增長了出錯的可能性。多線程

標準庫中僅僅定義了Future,更多的相關功能須要引用futures-rs類庫,裏面定義了一系列有關異步的操做,包括StreamSinkAsyncReadAsyncWrite等基礎Trait,以及對應實現了大量方便操做的組合子的Ext Trait,特別用途的fusedBoxTry系列的擴展,諸如join!select!pin_mut!等一系列的宏。理論上,不使用這些擴展也能寫出代碼,只不過那樣的代碼極可能篇幅會長的可怕。值得一提的是,除了一些能夠簡化代碼的過程宏以外,擴展Trait提供的組合子也會讓代碼精簡很多。好比Future::and_then可讓代碼寫成鏈式調用的方式;Sink::send包裝了Sink發送三步驟 poll_ready/start_send/poll_flush,使用.await一行代碼直接就能夠完成發送。所以,不少poll方式的代碼其實是準確地說是混合式的,其中也使用了很多async代碼塊。架構

總之,搞清楚Future相關的這些內容是須要花費很多時間,更不用說用它們來寫代碼了。不過,即使是使用async/await這種更高級原語,也是有必要了解底層的工做原理和實現機制,所謂知其然知其因此然。併發

async/await

使用async/await能夠將異步的代碼寫得相似同步的過程,更加符合人體工程學。由於async被翻譯爲一個Future狀態機,原先在poll方式中須要處理的與Pending相關的狀態如今都由async生成的狀態機自動完成,所以大大減輕了程序員的心智負擔。異步

如前所述,底層的Futures提供了不少方便的組合子擴展Future,使用起來很簡潔,能夠極大地簡化代碼。例如,上文提到過的Sink::send包裝了發送緩衝區的實現和異步發送的三個步驟;AsyncRead::read_exact實現了讀取指定字節數的功能,在處理網絡協議解析時能夠避免手寫一個拼包狀態機;AsyncWrite::write_all實現了發送所有數據以及發送緩衝,等等。正是在這些底層功能的支持下,async/await成爲了更高級的書寫異步代碼的方式。也許會有少量擔憂,這樣所謂「高級」會不會在性能上有很大損失?筆者我的不這麼認爲。自動實現的狀態機也許未必比程序員手動完成的性能更差。狀態機編程對於任何人,即使是一個有經驗的程序員都是不小挑戰。蹩腳的狀態機實現不只可能有性能問題,更大的風險來自於實現上的漏洞,以及維護上的困難。代碼寫出來更可能是給別人看的,完成一樣的功能,簡潔的代碼更有多是更高質量的代碼。

如下例子是固定長度分割的報文接收過程,使用async/await是很簡單的。若是實現爲一個Stream/poll_next,代碼會複雜不少。

/// convenient method for reading a whole frame
    pub async fn recv_frame(&mut self) -> io::Result<Vec<u8>> {
        let mut len = [0; 4];
        let _ = self.inner.read_exact(&mut len).await?;  // inner socket, 支持 AsyncRead

        let n = u32::from_be_bytes(len) as usize;
        if n > self.max_frame_len {
            let msg = format!(
                "data length {} exceeds allowed maximum {}",
                n, self.max_frame_len
            );
            return Err(io::Error::new(io::ErrorKind::PermissionDenied, msg));
        }

        let mut frame = vec![0; n];
        self.inner.read_exact(&mut frame).await?;

        Ok(frame)
    }

最後,徹底使用async/await寫代碼目前還有幾個問題:

  • async trait

當前Trait 不支持 async fn,沒法直接用Trait來抽象異步方法。暫時解決辦法是使用三方庫 async-trait。以下:

use async_trait::async_trait;

#[async_trait]
trait Advertisement {
    async fn run(&self);
}

async_trait將代碼轉換爲一個返回 Pin<Box<dyn Future + Send + 'async>> 的同步方法。由於裝箱和動態派發的緣由,性能上會有少量損失。

  • 異步析構

當前drop方法必須是同步調用,不能使用await語法。當一個I/O對象越過生命週期被析構,每每在關閉底層句柄以前,還須要完成某些I/O操做。好比,通知網絡對端鏈接已經關閉。在同步代碼中,咱們只須要在drop()中置入這些操做,可是在異步代碼中,沒法在drop()中作相似的事情。

解決辦法,老是在異步I/O對象越過生命週期以前顯式地執行關閉動做,或是,實現一個相似GC的功能,專門負責清理工做。

展望

筆者在學習Rust過程當中,主要關注網絡相關的併發編程。由於以前有在Go版本的ipfs/libp2p上的開發經驗,故而學習研究了rust-libp2p以及nervos tentaclerust-libp2pParity實現的準官方版本,可是這個項目的代碼及其難懂,過於強調使用泛型參數的抽象,致使代碼可讀性很是差。請教了代碼做者,他認可代碼可能有些複雜,但也強調都是有緣由的... nervos tentacle的實如今協議上不夠完整,特別是與標準libp2p並不兼容。兩個項目共有的特色是主要用poll的方式寫代碼,邏輯上都是狀態機的嵌套。

所以,筆者試圖徹底使用async/await方式重構libp2p,參考rust-libp2p的實現,代碼協程化,向上層提供純粹的異步接口,爭取在API層面的體驗接近go-libp2p,這是推廣Rust協程機制的一個嘗試,同時也是我的的一個學習的過程。目前剛剛起步,僅完成了secioyamux部分,待合適時機開源,指望更多Rust愛好者共同來開發完善。

參考:
Asynchronous Destructors

深圳星鏈網科科技有限公司(Netwarps),專一於互聯網安全存儲領域技術的研發與應用,是先進的安全存儲基礎設施提供商,主要產品有去中心化文件系統(DFS)、企業聯盟鏈平臺(EAC)、區塊鏈操做系統(BOS)。 微信公衆號:Netwarps
相關文章
相關標籤/搜索