轉自:https://www.cnblogs.com/hymenz/p/9334297.htmlhtml
緣起
在許多編程語言裏,咱們都很是樂於去研究在這個語言中所使用的異步網絡編程的框架,好比說Python的 Gevent、asyncio,Nginx 和 OpenResty,Go 等,今年年初我開始接觸 Rust,並被其無 GC、內存安全、極小的運行時等特性所吸引,通過一段時間的學習,開始尋找構建實際項目的解決方案,很快 mio、tokio 等框架進入了個人視野,因而開始從更加底層的 mio 出發實驗。react
https://github.com/Hevienz/mio_test/blob/master/src/main.rslinux
能夠看到 mio 是一個很是底層的異步編程的框架,這意味着若是咱們要在實際的項目開發中使用它時,就不得不從 event loop 開始編寫咱們的軟件,這並非咱們所指望的,因而咱們須要一個更高層次抽象的框架,這即是本文要爲你們講述的 tokio。git
tokio
tokio 是 Rust 中的異步編程框架,它將複雜的異步編程抽象爲 Futures、Tasks 和 Executor,並提供了 Timers 等基礎設施,下文中咱們將一一展開。github
運行時模型
tokio 是一個基於輪訓的模型。好比咱們要在 tokio 上調度咱們的 task,咱們須要爲其實現 Future
trait。好比下面的例子中,咱們想要獲得一個 widget,但它有可能尚未準備好,這時候咱們調用 poll 的結果就是 Ok(Async::NotReady)
,Executor 會負責重複的調用 poll
,直到 widget 準備好,返回Ok(Async::Ready(()))
。數據庫
/// A task that polls a single widget and writes it to STDOUT. pub struct MyTask; impl Future for MyTask { type Item = (); type Error = (); fn poll(&mut self) -> Result<Async<()>, ()> { match poll_widget() { Async::Ready(widget) => { println!("widget={:?}", widget); Ok(Async::Ready(())) } Async::NotReady => { return Ok(Async::NotReady); } } } }
在最簡單的狀況下,Executor 可能會長這樣。(注:這不是真實的實現,只是用來講明概念)express
pub struct SpinExecutor { tasks: VecDeque<Box<Future<Item = (), Error = ()>>>, } impl SpinExecutor { pub fn spawn<T>(&mut self, task: T) where T: Future<Item = (), Error = ()> + 'static { self.tasks.push_back(Box::new(task)); } pub fn run(&mut self) { while let Some(mut task) = self.tasks.pop_front() { match task.poll().unwrap() { Async::Ready(_) => {} Async::NotReady => { self.tasks.push_back(task); } } } } }
Executor 頻繁地輪詢全部 task,即便某些 task 仍然會以 NotReady
返回。
理想狀況下,Executor 應該能夠經過某種方式知道哪些 task 剛好轉變爲 「就緒」 狀態。這正是 futures 任務模型的核心。編程
Futures
future 是對一個將來事件的抽象。好比你能夠將各類事件抽象爲 future:緩存
- 在線程池中執行的數據庫查詢。當數據庫查詢完成時,future 完成,其值是查詢的結果。
- 對服務器的 RPC 調用。當服務器回覆時,future 完成,其值是服務器的響應。
- 超時事件。當時間到了,future 就完成了,它的值是
()
。 - 在線程池上運行的長時間運行的 CPU 密集型任務。任務完成後,future 完成,其值爲任務的返回值。
這裏咱們舉一個例子:安全
extern crate futures; extern crate tokio; extern crate tokio_core; use std::error::Error; use futures::Future; use futures::future::{ok, done}; use tokio_core::reactor::Core; fn my_fn_squared(i: u32) -> Result<u32, Box<Error>> { Ok(i * i) } fn my_fut_squared(i: u32) -> impl Future<Item = u32, Error = Box<Error + 'static>> { ok(i * i) } fn my_fut() -> impl Future<Item = u32, Error = Box<Error + 'static>> { ok(10) } fn main() { let mut reactor = Core::new().unwrap(); let chained_future = my_fut().and_then(|retval| { done(my_fn_squared(retval)).and_then(|retval2| my_fut_squared(retval2)) }); let retval3 = reactor.run(chained_future).unwrap(); println!("{:?}", retval3); }
這裏,咱們的 my_fut
的返回值實現了 Future,咱們知道它被 Executor 執行完成後,會返回一個 u32
或者 一個 Box<Error + 'static>
,而如今咱們就能夠經過 .and_then
來處理這個 u32
的值,而最終咱們將咱們的 future 連接了起來,交給 Executor 執行。
Tasks
Tasks 是應用程序的 「邏輯單元」。他們以 Future trait 來表示。一旦 task 完成處理,task 的 future 實現將以值 ()
返回。
Tasks 被傳遞給 Executor,Executor 處理 task 的調度。Executor 一般在一組或一組線程中調度許多 task。task 不得執行計算繁重的邏輯,不然將阻止其餘 task 執行。
Tasks 既能夠經過實現 Future trait 來實現,也能夠經過使用 futures
和 tokio
crates 中的各類組合器函數來構建 future 來實現。
I/O
tokio
crate 也提供了 TCP、UDP 的支持,不像 std
中的實現,tokio 的網絡類型是基於 poll 模型的,而且當他們的 「就緒」 狀態改變時會通知 task executors。在 tokio::net
模塊中你將會找到像 TcpListener、TcpStream、UdpSocket 這些類型。
全部這些類型都提供了 future
的 API 以及 poll
API。
Tokio 網絡類型被一個基於 mio
的 reactor 所驅動,默認狀況下,它在後臺線程上啓動。
使用 future API
一些幫助使用 future API 的函數包括:
incoming
:入站 TCP 鏈接的 Stream。read_exact
:將n
字節準確讀入緩衝區。read_to_end
:將全部字節讀入緩衝區。write_all
:寫緩衝區的所有內容。copy
:將字節從一個 I/O 句柄複製到另外一個。
這些函數中的許多都是源於 AsyncRead
和 AsyncWrite
trait 的。這些 trait 相似於 std
中的 Read
和 Write
,但僅僅用於具備 future aware
的類型,例如符合下面的特徵:
- 調用
read
或write
是非阻塞的,他們從不阻塞調用線程。 - 若是一個調用會以其餘方式阻塞,那麼會返回一個錯誤 WouldBlock。若是發生這種狀況,則當前 future 的task 將在 I/O 再次準備就緒時被調度。
注意 AsyncRead
和 AsyncWrite
類型的用戶應該使用 poll_read
和 poll_write
代替直接調用 read
和 write
。
例如,如下是如何接受鏈接,從中讀取5個字節,而後將5個字節寫回 socket 的例子:
let server = listener.incoming().for_each(|socket| { println!("accepted socket; addr={:?}", socket.peer_addr().unwrap()); let buf = vec![0; 5]; let connection = io::read_exact(socket, buf) .and_then(|(socket, buf)| { io::write_all(socket, buf) }) .then(|_| Ok(())); // Just discard the socket and buffer // Spawn a new task that processes the socket: tokio::spawn(connection); Ok(()) })
使用 Poll API
當手動實現 Future 時,須要使用基於 Poll 的 API,而且你須要返回 Async
。當您須要實現本身的處理自定義邏輯的組合器時,這很是有用。
例如,這就是如何爲 TcpStream 實現 read_exact
future 的例子。
pub struct ReadExact { state: State, } enum State { Reading { stream: TcpStream, buf: Vec<u8>, pos: usize, }, Empty, } impl Future for ReadExact { type Item = (TcpStream, Vec<u8>); type Error = io::Error; fn poll(&mut self) -> Result<Async<Self::Item>, io::Error> { match self.state { State::Reading { ref mut stream, ref mut buf, ref mut pos } => { while *pos < buf.len() { let n = try_ready!({ stream.poll_read(&mut buf[*pos..]) }); *pos += n; if n == 0 { let err = io::Error::new( io::ErrorKind::UnexpectedEof, "early eof"); return Err(err) } } } State::Empty => panic!("poll a ReadExact after it's done"), } match mem::replace(&mut self.state, State::Empty) { State::Reading { stream, buf, .. } => { Ok(Async::Ready((stream, buf))) } State::Empty => panic!(), } } }
數據報
UdpSocket 類型提供了許多方便的方法:
send_dgram
容許您將發送數據報做爲 future,若是沒法當即發送整個數據報,則返回錯誤。recv_dgram
表示將數據報讀入緩衝區。
示例
#[macro_use] extern crate log; extern crate futures; extern crate pretty_env_logger; extern crate tokio; use futures::future::{done, ok}; use futures::{Future, Stream}; use tokio::io::{self as tio, AsyncRead}; use tokio::net::{TcpListener, TcpStream}; use std::error; use std::fmt; use std::io; fn client_fut(socket: TcpStream) -> impl Future<Item = (), Error = ()> + 'static + Send { futures::lazy(move || match socket.peer_addr() { Ok(peer) => { info!("Tcp connection [{:?}] connected to server", peer); Ok((socket, peer)) } Err(err) => { error!("Fetch peer address failed: {:?}", err); Err(()) } }).and_then(move |(socket, peer)| { let buf = vec![0; 5]; let svc_fut = tio::read_exact(socket, buf) .and_then(|(socket, buf)| { tio::write_all(socket, buf) }) .then(|_| Ok(())); tokio::spawn(svc_fut); ok(()) }) } fn server_fut(listener: TcpListener) -> impl Future<Item = (), Error = ()> + 'static + Send { listener .incoming() .for_each(|socket| { tokio::spawn(client_fut(socket)); Ok(()) }) .map_err(|err| { error!("Accept connection failed: {:?}", err); }) } fn run() -> Result<(), io::Error> { let addr = "127.0.0.1:1234".parse().unwrap(); info!("Listening on {:?}", addr); let listener = TcpListener::bind(&addr)?; let server_fut = server_fut(listener); tokio::run(server_fut); Ok(()) } fn print<T: fmt::Debug, E: error::Error>(result: Result<T, E>) { match result { Ok(any) => info!("Result: {:?}", any), Err(err) => error!("Error: {:?}", err), } } fn init() { pretty_env_logger::init(); } fn main() { init(); print(run()); }
Timers
在編寫基於網絡的應用程序時,一般須要根據時間執行操做。
- 在一段時間後運行一些代碼。
- 取消運行時間過長的運行操做。
- 以必定間隔重複執行操做。
這些用例經過使用 timer
模塊中提供的各類計時器 API 來處理。
延遲運行代碼
在這個例子中,咱們但願在一段時間後執行任務。爲此,咱們使用 Delay
API。咱們要作的只是將 "Hello world!"
寫到終端。
use tokio::prelude::*; use tokio::timer::Delay; use std::time::{Duration, Instant}; fn main() { let when = Instant::now() + Duration::from_millis(100); let task = Delay::new(when) .and_then(|_| { println!("Hello world!"); Ok(()) }) .map_err(|e| panic!("delay errored; err={:?}", e)); tokio::run(task); }
爲長時間運行的操做設置 Timeout
在編寫健壯的網絡應用程序時,確保在合理的時間內完成操做相當重要。在等待來自外部的,不受信任的來源的數據時尤爲如此。
該 Deadline
類型確保操做在固定的時間內完成。
use tokio::io; use tokio::net::TcpStream; use tokio::prelude::*; use std::time::{Duration, Instant}; fn read_four_bytes(socket: TcpStream) -> Box<Future<Item = (TcpStream, Vec<u8>), Error = ()>> { // The instant at which the read will be aborted if // it has not yet completed. let when = Instant::now() + Duration::from_secs(5); let buf = vec![0; 4]; let fut = io::read_exact(socket, buf) .deadline(when) .map_err(|_| println!("failed to read 4 bytes by deadline")); Box::new(fut) }
週期性運行代碼
在一個時間間隔內重複運行代碼對於在套接字上發送 PING 消息,或常常檢查配置文件等狀況頗有用。
Interval
類型實現了 Stream
,並以指定的速率掛起。
use tokio::prelude::*; use tokio::timer::Interval; use std::time::{Duration, Instant}; fn main() { let task = Interval::new(Instant::now(), Duration::from_millis(100)) .take(10) .for_each(|instant| { println!("fire; instant={:?}", instant); Ok(()) }) .map_err(|e| panic!("interval errored; err={:?}", e)); tokio::run(task); }
計時器的注意事項
Tokio 計時器的粒度爲 1 毫秒。任何更小的間隔都會向上舍入到最接近的毫秒。定時器在用戶域中實現(即不使用操做系統定時器,像 linux 上的 timerfd)。它使用分層散列計時器輪實現,在建立,取消和觸發超時時提供有效的恆定時間複雜度。
Tokio 運行時包括每一個工做線程一個計時器實例。這意味着,若是運行時啓動4個工做線程,則將有4個計時器實例。這在大多數狀況下避免了同步,由於當使用計時器時,任務將在位於當前線程上的狀態下操做。
也就是說,計時器實現是線程安全的,並支持從任何線程使用。
基本組合器
下面是關於 Future 的圖表,來自於 Cheatsheet for Futures 。
// Constructing leaf futures fn empty () -> Future<T, E> fn ok (T) -> Future<T, E> fn err (E) -> Future<T, E> fn result(Result<T, E>) -> Future<T, E> // General future constructor fn poll_fn(FnMut(thread_local!(Task)) -> Poll<T, E>) -> Future<T, E> // Mapping futures fn Future::map (Future<T, E>, FnOnce(T) -> U) -> Future<U, E> fn Future::map_err (Future<T, E>, FnOnce(E) -> F) -> Future<T, F> fn Future::from_err(Future<T, Into<E>>) -> Future<T, E> // Chaining (sequencing) futures fn Future::then (Future<T, E>, FnOnce(Result<T, E>) -> IntoFuture<U, F>) -> Future<U, F> fn Future::and_then(Future<T, E>, FnOnce(T) -> IntoFuture<U, E>) -> Future<U, E> fn Future::or_else (Future<T, E>, FnOnce(E) -> IntoFuture<T, F>) -> Future<T, F> fn Future::flatten (Future<Future<T, E>, Into<E>>) -> Future<T, E> // Joining (waiting) futures fn Future::join (Future<T, E>, IntoFuture<U, E>) -> Future<(T, U), E> fn Future::join3(Future<T, E>, IntoFuture<U, E>, IntoFuture<V, E>) -> Future<(T, U, V), E> fn Future::join4(Future<T, E>, IntoFuture<U, E>, IntoFuture<V, E>, IntoFuture<W, E>) -> Future<(T, U, V, W), E> fn Future::join5(Future<T, E>, IntoFuture<U, E>, IntoFuture<V, E>, IntoFuture<W, E>, IntoFuture<X, E>) -> Future<(T, U, V, W, X), E> fn join_all (IntoIterator<IntoFuture<T, E>>) -> Future<Vec<T>, E> // Selecting (racing) futures fn Future::select (Future<T, E>, IntoFuture<T, E>) -> Future<(T, Future<T, E>), (E, Future<T, E>)> fn Future::select2(Future<T, E>, IntoFuture<U, F>) -> Future<Either<(T, Future<U, F>), (U, Future<T, E>)>, Either<(E, Future<U, F>), (F, Future<T, E>)>> fn select_all (IntoIterator<IntoFuture<T, E>>) -> Future<(T, usize, Vec<Future<T, E>>), (E, usize, Vec<Future<T, E>>)> fn select_ok (IntoIterator<IntoFuture<T, E>>) -> Future<(T, Vec<Future<T, E>>), E> // Utility fn lazy (FnOnce() -> IntoFuture<T, E>) -> Future<T, E> fn loop_fn (S, FnMut(S) -> IntoFuture<Loop<T, S>, E>) -> Future<T, E> // Miscellaneous fn Future::into_stream (Future<T, E>) -> Stream<T, E> fn Future::flatten_stream(Future<Stream<T, E>, E>) -> Stream<T, E> fn Future::fuse (Future<T, E>) -> Future<T, E> fn Future::catch_unwind (Future<T, E>+UnwindSafe) -> Future<Result<T, E>, Any+Send> fn Future::shared (Future<T, E>) -> Future<SharedItem<T>, SharedError<E>>+Clone fn Future::wait (Future<T, E>) -> Result<T, E>
這部分的內容推薦參考這篇文章,https://www.jianshu.com/p/5059c403a335。
本文再也不贅述。
返回 futures
在使用 futures 時,您可能須要作的第一件事就是返回一個 Future
。這有幾種選擇,從最符合人體工程學到最不符合。
- Trait 對象
impl Trait
Trait 對象
首先,您始終能夠選擇返回一個 boxed trait 對象
:
fn foo() -> Box<Future<Item = u32, Error = io::Error>> { // ... }
這個策略的好處是它很容易寫出來而且易於建立。
這種方法的缺點是,在構建 future 時須要運行時分配,在使用該 future 時須要動態分派。Box
須要在堆上分配而 future 會被置入其中。
一般能夠經過僅在您想要返回的 future 鏈的最後來 Boxing
來減小分配。
impl Trait
在 Rust 1.26 版本以後(2018年5月7日發佈),咱們可使用叫作 impl Trait
的新的語言特性。
fn add_10<F>(f: F) -> impl Future<Item = i32, Error = F::Error> where F: Future<Item = i32>, { f.map(|i| i + 10) }
這種方法的好處在於它是零開銷的,再也不須要 Box
。
使用 framed streams
Tokio 有幫助函數將字節流轉換爲幀流。字節流的例子包括 TCP 鏈接,管道,文件對象以及標準輸入和輸出。在Rust中,streams 很容易識別,由於它們實現了 Read
和 Write
trait。
最簡單的幀化的消息形式之一是行分隔消息。每條消息都以一個 \n
字符結尾。讓咱們看一下如何使用 tokio 實現行分隔消息流。
編寫編解碼器
編解碼器實現 tokio_codec::Decoder
和 tokio_codec::Encoder
trait。他的工做就是將字節轉爲幀以及相反。這些 trait
與 tokio_codec::Framed
struct一塊兒使用,以提供字節流的緩衝,解碼和編碼。
讓咱們看一下LinesCodec
struct 的簡化版本,它實現了行分隔消息的解碼和編碼。
pub struct LinesCodec { // Stored index of the next index to examine for a `\n` character. // This is used to optimize searching. // For example, if `decode` was called with `abc`, it would hold `3`, // because that is the next index to examine. // The next time `decode` is called with `abcde\n`, the method will // only look at `de\n` before returning. next_index: usize, }
這裏的註釋解釋了,因爲字節被緩存直到找到一行,所以每次接收數據時從緩衝區的開頭搜索 \n
是浪費的。保存緩衝區的最後長度並在收到新數據時從那裏開始搜索將更有效。
Decoder::decode
是在底層流上接收到數據時調用的方法。該方法能夠生成幀或返回 Ok(None)
以表示它須要更多數據來生成幀。該 decode 方法負責經過使用 BytesMut 的方法將再也不須要緩衝的數據刪除。若是數據未刪除,緩衝區將持續增加。
讓咱們來看看如何爲 LinesCodec
實現 Decoder::decode
。
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> { // Look for a byte with the value '\n' in buf. Start searching from the search start index. if let Some(newline_offset) = buf[self.next_index..].iter().position(|b| *b == b'\n') { // Found a '\n' in the string. // The index of the '\n' is at the sum of the start position + the offset found. let newline_index = newline_offset + self.next_index; // Split the buffer at the index of the '\n' + 1 to include the '\n'. // `split_to` returns a new buffer with the contents up to the index. // The buffer on which `split_to` is called will now start at this index. let line = buf.split_to(newline_index + 1); // Trim the `\n` from the buffer because it's part of the protocol, // not the data. let line = &line[..line.len() - 1]; // Convert the bytes to a string and panic if the bytes are not valid utf-8. let line = str::from_utf8(&line).expect("invalid utf8 data"); // Set the search start index back to 0. self.next_index = 0; // Return Ok(Some(...)) to signal that a full frame has been produced. Ok(Some(line.to_string())) } else { // '\n' not found in the string. // Tell the next call to start searching after the current length of the buffer // since all of it was scanned and no '\n' was found. self.next_index = buf.len(); // Ok(None) signifies that more data is needed to produce a full frame. Ok(None) } }
當須要將幀寫入下層流時,Encoder::encode
方法被調用。幀必須寫入緩衝區並做爲一個參數。寫入緩衝區的數據將在準備好發送數據時寫入流。
如今讓咱們來看看如何爲 LinesCodec
實現 Encoder::encode
。
fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> { // It's important to reserve the amount of space needed. The `bytes` API // does not grow the buffers implicitly. // Reserve the length of the string + 1 for the '\n'. buf.reserve(line.len() + 1); // String implements IntoBuf, a trait used by the `bytes` API to work with // types that can be expressed as a sequence of bytes. buf.put(line); // Put the '\n' in the buffer. buf.put_u8(b'\n'); // Return ok to signal that no error occured. Ok(()) }
編碼信息一般更簡單。這裏咱們只需保留所需的空間並將數據寫入緩衝區。
使用編解碼器
使用編解碼器的最簡單方法是使用 Framed
結構體。它是實現自動緩衝的編解碼器的包裝器。該 Framed
結構體既是 Stream
也是 Sink
。所以,您能夠從中接收幀並向其發送幀。
您可使用任何實現了 AsyncRead
和 AsyncWrite
trait 的類型,使用 AsyncRead::framed
方法建立一個 Framed
結構體。
TcpStream::connect(&addr).and_then(|sock| { let framed_sock = sock.framed(LinesCodec::new()); framed_sock.for_each(|line| { println!("Received line {}", line); Ok(()) }) });