寫在前面:這是一篇近一年前的草稿了,翻出來發現,關於 Task
(已更名爲 Thread
)退出的一些作法仍然適用,並且 zmq.rs 0.2 不出意外也要用到,因此仍然把這篇寫完貼出來備查。但請注意,文中關於 libgreen
的一些描述已不屬實。html
這一篇隔的時間比較長,期間咱們的遊戲在準備上線,因此也沒時間寫 Rust,着重在課餘時間研讀了各類文檔、源碼和 issue report——關於 Rust 的 I/O。git
Task
開始個人 zmq.rs 項目終於開始碰到網絡操做了。因爲 Rust 給封裝出來的 I/O 接口都是相對於 Task
同步的,因此目前來看還得給每一批 I/O 操做建立一個 Task
。程序員
這裏得多插一點內容了。首先是關於 ZeroMQ,它的一個 socket 是能夠跟不少個別的 ZMQ socket 通信的,並且是能夠經過不一樣的 endpoint 來完成。好比一個 REP 服務端 socket,同時監聽了 8080 和 9090 兩個端口(endpoint),每一個端口可能都有數十個 REQ 客戶端 socket 鏈接上去;更有甚者,這個 REP socket 也能夠主動去鏈接某些客戶端 socket 監聽的 endpoint,上門服務。而在全部這些網絡拓撲結構的上面,一個 REP socket 對程序員的接口是始終一致的,您只須要反覆地從這個 REP socket 讀一個數據包,而後發一個數據包就行了。github
這樣一來呢,我就得給 REP socket 底層的每個 TCP socket 鏈接建立至少一個 Task
,以知足其併發性。以前咱們有提到,Rust 提供了 libgreen
和 libnative
兩種運行時環境,對應了兩種不一樣的 Task
實現模型。對於 zmq.rs 來講,基於目前的阻塞式的 I/O 接口來看,咱們頗有可能須要建立大量的 Task
——這對於 libgreen
的 M:N
模型來講是垂手可得的,但對於 libnative
來講倒是值得商榷的,由於 1:1
的模型意味着咱們將會用大量操做系統的線程來微操極少許的 ZMQ socket,這對於追求高併發的 ZMQ 也許並非一個好主意——雖然 Rust 的 Task
模型能極大地避免常規多線程編程中最糟心的那一部分,可是內存佔用會不會過高(哈!高!-2015),上下文切換的代價會不會太大等問題還有待於進一步測試。若是結果不理想,也許每批 I/O 一個 Task
的這種設計就要被推倒,新的設計將須要 Rust 提供異步的 I/O 接口。(0.2 確實將這麼改 -2015)web
回到原來的話題。由於建立了好多 Task
,必定會碰到的問題就是怎麼結束它們,因此我一上來就打算先看一眼這個問題。沒想到這一看,看出了好多問題。編程
由於受 Python greenlet 的嚴重影響,我天然而然地覺得,結束一個 Task
應該用 kill()
——對於一個暫停狀態的微線程,扔進去一個 GreenletExit
異常是多麼正常的一種方式。但是 Rust 不那麼認爲。我也想到了因爲須要支持 libnative
,中止一個阻塞在 I/O 調用中的線程絕非扔一個 GreenletExit
那麼簡單,但我仍是義無反顧地去搜各類 rust task kill terminate shutdown 之類的關鍵詞。網絡
結果逐步明朗,原來 Rust 在 0.9 以前確實有過 Task
的 kill
功能,是經過 supervisor 模型實現的——即一對 Task
,任何一個掛掉都會致使另外一個掛掉。可是呢,因爲一些緣由這個功能被砍掉了,也就是說,在 Rust 裏,一個 Task
只能從內部拋錯誤死掉,沒有辦法從外部直接殺掉。另外,這個還(竟然!)致使了個人第一個 stackoverflow 回答。多線程
Task
既然沒法從別的 Task
中主動殺掉一個 Task
,那麼就想辦法讓這個 Task
自殺。一個長時間運行的 Task
一般處於兩種狀態:一、執行代碼;二、等待事件。併發
對於一個正在執行代碼的 Task
,咱們是沒法讓 Task
本身突然想到該結束了而後戛然而止。只有在某些特殊狀況下,咱們才能手工寫一些代碼,讓程序執行一段代碼以後,去檢查一下是否應該結束了,好比在一個死循環裏:異步
rustwhile self.running { // Do everything else }
而大多數其餘狀況下,從事件等待中跳出來結束一個 Task
更爲常見。這裏也分兩種狀況:A、等待 I/O 事件;B、等待 channel
事件。兩種狀況處理都比較簡單,A 的話就給調用加一個稍短的超時,而後重複前面的那個例子,好比:
rusttcp_stream.set_read_timeout(Some(1000)); while self.running { let result = tcp_stream.read_byte(); // Do the rest }
而對於等待一個 channel
的 Task
來講就更容易了,只要 channel
的另外一端銷燬了,這個等待的調用就會自動結束,只須要正確處理調用結果就行了。
其實,上面兩個例子中的 self.running
應(至少)爲一個 Arc
,由於須要從別的 Task
中來設置這個值。這裏還有一種也是用 channel
的處理方式,就是在每次循環的開始處,向一個鏈接到父 Task
的 channel
的 Sender
端,發送一個空白消息,這樣若是另外一端已經銷燬了,發送會失敗,也就意味着咱們該退出這個 Task
了,好比這樣:
rustlet (tx, rx) = channel(); // ... in task let mut a = TcpListener::bind("127.0.0.1:8482").listen().unwrap(); a.set_timeout(Some(1000)); loop { match a.accept() { Ok(s) => { tx.send(Some(s)); } Err(ref e) if e.kind == TimedOut => { tx.send(None).unwrap(); } Err(e) => println!("err: {}", e), // something else } }