Rust 語言學習筆記(四)—— I/O

寫在前面:這是一篇近一年前的草稿了,翻出來發現,關於 Task(已更名爲 Thread)退出的一些作法仍然適用,並且 zmq.rs 0.2 不出意外也要用到,因此仍然把這篇寫完貼出來備查。但請注意,文中關於 libgreen 的一些描述已不屬實。html

這一篇隔的時間比較長,期間咱們的遊戲在準備上線,因此也沒時間寫 Rust,着重在課餘時間研讀了各類文檔、源碼和 issue report——關於 Rust 的 I/O。git

從試圖殺掉一個 Task 開始

個人 zmq.rs 項目終於開始碰到網絡操做了。因爲 Rust 給封裝出來的 I/O 接口都是相對於 Task 同步的,因此目前來看還得給每一批 I/O 操做建立一個 Task程序員

這裏得多插一點內容了。首先是關於 ZeroMQ,它的一個 socket 是能夠跟不少個別的 ZMQ socket 通信的,並且是能夠經過不一樣的 endpoint 來完成。好比一個 REP 服務端 socket,同時監聽了 8080 和 9090 兩個端口(endpoint),每一個端口可能都有數十個 REQ 客戶端 socket 鏈接上去;更有甚者,這個 REP socket 也能夠主動去鏈接某些客戶端 socket 監聽的 endpoint,上門服務。而在全部這些網絡拓撲結構的上面,一個 REP socket 對程序員的接口是始終一致的,您只須要反覆地從這個 REP socket 讀一個數據包,而後發一個數據包就行了。github

這樣一來呢,我就得給 REP socket 底層的每個 TCP socket 鏈接建立至少一個 Task,以知足其併發性。以前咱們有提到,Rust 提供了 libgreenlibnative 兩種運行時環境,對應了兩種不一樣的 Task 實現模型。對於 zmq.rs 來講,基於目前的阻塞式的 I/O 接口來看,咱們頗有可能須要建立大量的 Task——這對於 libgreenM:N 模型來講是垂手可得的,但對於 libnative 來講倒是值得商榷的,由於 1:1 的模型意味着咱們將會用大量操做系統的線程來微操極少許的 ZMQ socket,這對於追求高併發的 ZMQ 也許並非一個好主意——雖然 Rust 的 Task 模型能極大地避免常規多線程編程中最糟心的那一部分,可是內存佔用會不會過高(哈!高!-2015),上下文切換的代價會不會太大等問題還有待於進一步測試。若是結果不理想,也許每批 I/O 一個 Task 的這種設計就要被推倒,新的設計將須要 Rust 提供異步的 I/O 接口。(0.2 確實將這麼改 -2015web

回到原來的話題。由於建立了好多 Task,必定會碰到的問題就是怎麼結束它們,因此我一上來就打算先看一眼這個問題。沒想到這一看,看出了好多問題。編程

由於受 Python greenlet 的嚴重影響,我天然而然地覺得,結束一個 Task 應該用 kill()——對於一個暫停狀態的微線程,扔進去一個 GreenletExit 異常是多麼正常的一種方式。但是 Rust 不那麼認爲。我也想到了因爲須要支持 libnative,中止一個阻塞在 I/O 調用中的線程絕非扔一個 GreenletExit 那麼簡單,但我仍是義無反顧地去搜各類 rust task kill terminate shutdown 之類的關鍵詞。網絡

結果逐步明朗,原來 Rust 在 0.9 以前確實有過 Taskkill 功能,是經過 supervisor 模型實現的——即一對 Task,任何一個掛掉都會致使另外一個掛掉。可是呢,因爲一些緣由這個功能被砍掉了,也就是說,在 Rust 裏,一個 Task 只能從內部拋錯誤死掉,沒有辦法從外部直接殺掉。另外,這個還(竟然!)致使了個人第一個 stackoverflow 回答多線程

正確結束一個 Task

既然沒法從別的 Task 中主動殺掉一個 Task,那麼就想辦法讓這個 Task 自殺。一個長時間運行的 Task 一般處於兩種狀態:一、執行代碼;二、等待事件。併發

對於一個正在執行代碼的 Task,咱們是沒法讓 Task 本身突然想到該結束了而後戛然而止。只有在某些特殊狀況下,咱們才能手工寫一些代碼,讓程序執行一段代碼以後,去檢查一下是否應該結束了,好比在一個死循環裏:異步

rustwhile self.running {
    // Do everything else
}

而大多數其餘狀況下,從事件等待中跳出來結束一個 Task 更爲常見。這裏也分兩種狀況:A、等待 I/O 事件;B、等待 channel 事件。兩種狀況處理都比較簡單,A 的話就給調用加一個稍短的超時,而後重複前面的那個例子,好比:

rusttcp_stream.set_read_timeout(Some(1000));
while self.running {
    let result = tcp_stream.read_byte();
    // Do the rest
}

而對於等待一個 channelTask 來講就更容易了,只要 channel 的另外一端銷燬了,這個等待的調用就會自動結束,只須要正確處理調用結果就行了。

其實,上面兩個例子中的 self.running 應(至少)爲一個 Arc,由於須要從別的 Task 中來設置這個值。這裏還有一種也是用 channel 的處理方式,就是在每次循環的開始處,向一個鏈接到父 TaskchannelSender 端,發送一個空白消息,這樣若是另外一端已經銷燬了,發送會失敗,也就意味着咱們該退出這個 Task 了,好比這樣:

rustlet (tx, rx) = channel();

// ... in task
let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap();
a.set_timeout(Some(1000));
loop {
    match a.accept() {
        Ok(s) => { tx.send(Some(s)); }
        Err(ref e) if e.kind == TimedOut => { tx.send(None).unwrap(); }
        Err(e) => println!("err: {}", e), // something else
    }
}
相關文章
相關標籤/搜索