做者:謝敬偉,江湖人稱「刀哥」,20年IT老兵,數據通訊網絡專家,電信網絡架構師,目前任Netwarps開發總監。刀哥在操做系統、網絡編程、高併發、高吞吐、高可用性等領域有多年的實踐經驗,並對網絡及編程等方面的新技術有濃厚的興趣。
Rust
歷史不長,仍然處於快速發展的歷程中。關於異步編程的模式,如今已經發展到async/await
協程的高級階段。大概是由於async/await
出現的時間還不長,因此現有大多數的開源項目並非或不是純粹使用async/await
來書寫的,而是前先後後有多種的寫法。這樣的情況給Rust
的學習帶來了一些的難度。在這裏,咱們來捋一捋異步代碼的幾種寫法。git
最原始的方式是使用mio
進行開發。mio
是一個底層異步I/O
庫,提供非阻塞方式的API
,具備很高的性能。實際上mio
是對於操做系統epoll/kqueue/IOCP
的封裝。在C/C++
中咱們使用libevent
之類的庫,mio
能夠理解爲對應的Rust
版本。基於mio
的代碼大體以下:程序員
loop { // Poll Mio for events, blocking until we get an event. poll.poll(&mut events, None)?; // Process each event. for event in events.iter() { if event.is_writable() { // socket可寫,開始發送數據 } if event.is_readable() { // socket可讀,開始接收數據 } // socket 關閉,退出循環 return Ok(()); } }
總的來講,這是徹底基於異步事件通知的寫法,和C/C++
區別不是很大,異步代碼對於程序員是一個挑戰,當代碼邏輯愈來愈複雜,添加新功能或是解決已有問題的難度也愈來愈大。編程
另外,mio
實現的是一個單線程事件循環,雖然能夠處理成千上萬路的I/O
操做,但沒有多線程的能力,須要本身擴充。安全
爲了更好地規範異步的邏輯,Rust
抽象出Future
表示還沒有發生的事物。這些Future
能夠用不少方式組合成一個更復雜的複合Future
來表明一系列的事件。Future
須要程序主動去poll
(輪詢)才能獲取到最終的結果,每一次輪詢的結果多是Ready
或者Pending
。微信
運行庫提供Executor
和Reactor
來執行Future
,也就是調用Future
的poll
方法循環執行一系列就緒的Future
,當Future
返回Pending
的時候,會將Future
轉移到Reactor
上等待喚醒。Reactor
被用來負責喚醒以前沒法完成的Future
。事實上,tokio
的Reactor
是基於mio
實現的,而async-std/smol
則是封裝了epoll/kqueue/IOCP
,提供相似的功能。網絡
手動實現Future
是一件相對繁瑣的工做,主要的問題在於異步模式自己的特性。例如,接收網絡數據,沒法臆測每次輪詢會收到多少字節的數據,每每須要開闢一段接收緩衝區容納數據,協議解碼也須要一個狀態機拼包向上層提交;發送網絡數據存在類似問題,發送數據時底層未就緒,則緩衝發送數據,待下次輪詢時,須要首先檢查並處理髮送緩衝區。另外還有一些值得注意的地方,若是手動實現的Future
返回Pending
,則必須本身實現喚醒機制,也就是須要將cx
克隆一份記下來,而後在適當的時侯調用cx.wake()
。由於網絡相關的功能每每是分層的,所以手動的Poll
循環也會是層層堆疊的,這時候,返回值Poll::Ready(T)
就有學問了。泛型T可能包裹各類不一樣的數據,Option<T>
,Result<T,E>
,或者二者的組合。由於最外層還有一個Poll<T>
,全部這時候的match
語句寫起來會很是臃腫,粘貼複製寫不少代碼,完成的功能卻很是有限,並且因爲這些代碼很類似,大大增長了出錯的可能性。多線程
標準庫中僅僅定義了Future
,更多的相關功能須要引用futures-rs
類庫,裏面定義了一系列有關異步的操做,包括Stream
、Sink
、AsyncRead
、AsyncWrite
等基礎Trait
,以及對應實現了大量方便操做的組合子的Ext Trait
,特別用途的fused
、Box
,Try
系列的擴展,諸如join!
、select!
、pin_mut!
等一系列的宏。理論上,不使用這些擴展也能寫出代碼,只不過那樣的代碼極可能篇幅會長的可怕。值得一提的是,除了一些能夠簡化代碼的過程宏以外,擴展Trait
提供的組合子也會讓代碼精簡很多。好比Future::and_then
可讓代碼寫成鏈式調用的方式;Sink::send
包裝了Sink
發送三步驟 poll_ready/start_send/poll_flush
,使用.await
一行代碼直接就能夠完成發送。所以,不少poll
方式的代碼其實是準確地說是混合式的,其中也使用了很多async
代碼塊。架構
總之,搞清楚Future
相關的這些內容是須要花費很多時間,更不用說用它們來寫代碼了。不過,即使是使用async/await
這種更高級原語,也是有必要了解底層的工做原理和實現機制,所謂知其然知其因此然。併發
使用async/await
能夠將異步的代碼寫得相似同步的過程,更加符合人體工程學。由於async
被翻譯爲一個Future
狀態機,原先在poll
方式中須要處理的與Pending
相關的狀態如今都由async
生成的狀態機自動完成,所以大大減輕了程序員的心智負擔。異步
如前所述,底層的Futures
提供了不少方便的組合子擴展Future
,使用起來很簡潔,能夠極大地簡化代碼。例如,上文提到過的Sink::send
包裝了發送緩衝區的實現和異步發送的三個步驟;AsyncRead::read_exact
實現了讀取指定字節數的功能,在處理網絡協議解析時能夠避免手寫一個拼包狀態機;AsyncWrite::write_all
實現了發送所有數據以及發送緩衝,等等。正是在這些底層功能的支持下,async/await
成爲了更高級的書寫異步代碼的方式。也許會有少量擔憂,這樣所謂「高級」會不會在性能上有很大損失?筆者我的不這麼認爲。自動實現的狀態機也許未必比程序員手動完成的性能更差。狀態機編程對於任何人,即使是一個有經驗的程序員都是不小挑戰。蹩腳的狀態機實現不只可能有性能問題,更大的風險來自於實現上的漏洞,以及維護上的困難。代碼寫出來更可能是給別人看的,完成一樣的功能,簡潔的代碼更有多是更高質量的代碼。
如下例子是固定長度分割的報文接收過程,使用async/await
是很簡單的。若是實現爲一個Stream/poll_next
,代碼會複雜不少。
/// convenient method for reading a whole frame pub async fn recv_frame(&mut self) -> io::Result<Vec<u8>> { let mut len = [0; 4]; let _ = self.inner.read_exact(&mut len).await?; // inner socket, 支持 AsyncRead let n = u32::from_be_bytes(len) as usize; if n > self.max_frame_len { let msg = format!( "data length {} exceeds allowed maximum {}", n, self.max_frame_len ); return Err(io::Error::new(io::ErrorKind::PermissionDenied, msg)); } let mut frame = vec![0; n]; self.inner.read_exact(&mut frame).await?; Ok(frame) }
最後,徹底使用async/await
寫代碼目前還有幾個問題:
當前Trait
不支持 async fn
,沒法直接用Trait
來抽象異步方法。暫時解決辦法是使用三方庫 async-trait
。以下:
use async_trait::async_trait; #[async_trait] trait Advertisement { async fn run(&self); }
宏 async_trait
將代碼轉換爲一個返回 Pin<Box<dyn Future + Send + 'async>>
的同步方法。由於裝箱和動態派發的緣由,性能上會有少量損失。
當前drop
方法必須是同步調用,不能使用await
語法。當一個I/O
對象越過生命週期被析構,每每在關閉底層句柄以前,還須要完成某些I/O
操做。好比,通知網絡對端鏈接已經關閉。在同步代碼中,咱們只須要在drop()
中置入這些操做,可是在異步代碼中,沒法在drop()
中作相似的事情。
解決辦法,老是在異步I/O
對象越過生命週期以前顯式地執行關閉動做,或是,實現一個相似GC
的功能,專門負責清理工做。
筆者在學習Rust
過程當中,主要關注網絡相關的併發編程。由於以前有在Go
版本的ipfs/libp2p
上的開發經驗,故而學習研究了rust-libp2p
以及nervos tentacle
。rust-libp2p
是Parity
實現的準官方版本,可是這個項目的代碼及其難懂,過於強調使用泛型參數的抽象,致使代碼可讀性很是差。請教了代碼做者,他認可代碼可能有些複雜,但也強調都是有緣由的... nervos tentacle
的實如今協議上不夠完整,特別是與標準libp2p
並不兼容。兩個項目共有的特色是主要用poll
的方式寫代碼,邏輯上都是狀態機的嵌套。
所以,筆者試圖徹底使用async/await
方式重構libp2p
,參考rust-libp2p
的實現,代碼協程化,向上層提供純粹的異步接口,爭取在API
層面的體驗接近go-libp2p
,這是推廣Rust
協程機制的一個嘗試,同時也是我的的一個學習的過程。目前剛剛起步,僅完成了secio
與yamux
部分,待合適時機開源,指望更多Rust
愛好者共同來開發完善。
參考:
Asynchronous Destructors
深圳星鏈網科科技有限公司(Netwarps),專一於互聯網安全存儲領域技術的研發與應用,是先進的安全存儲基礎設施提供商,主要產品有去中心化文件系統(DFS)、企業聯盟鏈平臺(EAC)、區塊鏈操做系統(BOS)。 微信公衆號:Netwarps