Rust async/.await內幕

在這個教程中咱們將詳細分析rust異步代碼async/.await的內部運行機制。咱們將使用async-std庫而不是tokio,由於這是第一個支持async/.await語法的rust庫。async/.await原理解析教程分爲兩部分,這是第一部分。php

區塊鏈開發教程連接:以太坊 | 比特幣 | EOS | Tendermint | Hyperledger Fabric | Omni/USDT | Ripple編程

0、準備Rust練習環境

首先讓咱們先建立一個Cargo項目:併發

~$ cargo new --bin sleepus-interruptus

若是你指望和教程使用的編譯器保持一致,能夠添加一個內容爲1.39.0的rust-toolchain文件。less

在繼續下面的內容以前,先運行cargo run確保環境沒有問題。異步

一、一個交替顯示的Rust程序

咱們要寫一個簡單的程序,它能夠顯示10次Sleepus消息,每次間隔0.5秒;同時顯示5次Interruptus消息,每次間隔1秒。下面是至關簡單的rust實現代碼:async

use std::thread::{sleep};
use std::time::Duration;

fn sleepus() {
    for i in 1..=10 {
        println!("Sleepus {}", i);
        sleep(Duration::from_millis(500));
    }
}

fn interruptus() {
    for i in 1..=5 {
        println!("Interruptus {}", i);
        sleep(Duration::from_millis(1000));
    }
}

fn main() {
    sleepus();
    interruptus();
}

不過,上面的代碼會同步執行兩個操做,它會先顯示完全部的Sleepus消息,而後再顯示Interruptus消息。而咱們指望的是這兩種消息交織顯示,也就是說Interruptus消息能夠打斷Sleepus消息的顯示。ide

有兩個辦法能夠實現交織顯示的目標。顯而易見的一個是爲每一個函數建立一個單獨的線程,而後等待線程執行完畢。異步編程

use std::thread::{sleep, spawn};

fn main() {
    let sleepus = spawn(sleepus);
    let interruptus = spawn(interruptus);

    sleepus.join().unwrap();
    interruptus.join().unwrap();
}

須要指出的是:函數

  • 咱們使用spawn(sleepus)而不是spawn(sleepus())來建立線程。後者將 當即執行sleepus()而後將其執行結果傳給spawn,這不是咱們指望的- 我在主函數種使用join()來等待子線程結束,並使用unwrap()來處理 能夠發生的故障,由於我懶。

另外一種實現方法是建立一個輔助線程,而後在主線程種調用其中一個函數:oop

fn main() {
    let sleepus = spawn(sleepus);
    interruptus();

    sleepus.join().unwrap();
}

這種方法效率更高,由於只須要額外建立一個線程,而且也沒有什麼反作用,所以我推薦使用這個方法。

不過這兩種方法都不是異步解決方案!咱們使用兩個由操做系統管理的線程來併發執行兩個同步任務!接下來讓咱們嘗試如何在單一線程內讓兩個任務協做執行!

二、用Rust異步async/.await實現交替顯示程序

咱們將從較高層次的抽象開始,而後逐步深刻rust異步編程的細節。如今讓咱們以async風格重寫前面的應用。

首先在Cargo.toml中添加如下依賴:

async-std = { version = "1.2.0", features = ["attributes"] }

如今咱們能夠將應用重寫爲:

use async_std::task::{sleep, spawn};
use std::time::Duration;

async fn sleepus() {
    for i in 1..=10 {
        println!("Sleepus {}", i);
        sleep(Duration::from_millis(500)).await;
    }
}

async fn interruptus() {
    for i in 1..=5 {
        println!("Interruptus {}", i);
        sleep(Duration::from_millis(1000)).await;
    }
}

#[async_std::main]
async fn main() {
    let sleepus = spawn(sleepus());
    interruptus().await;

    sleepus.await;
}

主要的修改說明以下:

  • 咱們再也不使用std::thread中的sleep和spawn函數,而是採用async_std::task。- 在sleepus和interruptus函數前都加async
  • 在調用sleep以後,咱們補充了.await。注意不是.await()調用,而是一個新語法
  • 在主函數上使用#[async_std::main]屬性
  • 主函數前也有async關鍵字
  • 咱們如今使用spawn(sleepus())而不是spawn(sleepus),這表示直接調用sleepus 並將結果傳給spawn
  • 對interruptus()的調用增長.await
  • 對sleepus再也不使用join(),而是改用.await語法

看起來有不少修改,不過實際上,咱們的代碼結構和以前的版本基本是一致的。如今程序運行和咱們的指望一致:採用單一線程進行無阻塞調用。

接下來讓咱們分析上述修改到底意味着什麼。

三、async關鍵字的做用

在函數定義前添加async主要作了如下3個事:

  • 這將容許你在函數體內使用.await語法。咱們接下來會深刻探討這一點
  • 它修改了函數的返回類型。async fn foo() -> Bar 實際上返回的是 impl std::future::Future<Output=Bar>
  • 它自動將結果值封裝進一個新的Future對象。咱們下面會詳細展現這一點

如今讓咱們展開說明第2點。在Rust的標準庫中有一個名爲Future的trait,Future有一個關聯類型Output。這個trait的意思是:我承諾當我完成任務時,會給你一個類型爲Output的值。例如你能夠想象一個異步HTTP客戶端可能會這樣實現:

impl HttpRequest {
    fn perform(self) -> impl Future<Output=HttpResponse> { ... }
}

在發送HTTP請求時須要一些無阻塞的I/O,咱們並不但願阻塞調用線程,可是須要最終獲得響應結果。

async fn sleepus()的結果類型隱含爲()。所以咱們的Future的Output也應該爲()。這意味着咱們須要修改函數爲:

fn sleepus() -> impl std::future::Future<Output=()>

不過若是隻修改這裏,編譯就會出現以下錯誤:

error[E0728]: `await` is only allowed inside `async` functions and blocks
 --> src/main.rs:7:9
  |
4 | fn sleepus() -> impl std::future::Future<Output=()> {
  |    ------- this is not `async`
...
7 |         sleep(Duration::from_millis(500)).await;
  |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks

error[E0277]: the trait bound `(): std::future::Future` is not satisfied
 --> src/main.rs:4:17
  |
4 | fn sleepus() -> impl std::future::Future<Output=()> {
  |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `std::future::Future` is not implemented for `()`
  |
  = note: the return type of a function must have a statically known size

第一個錯誤信息很直接:你只能在async函數或代碼塊中使用.await語法。咱們尚未接觸到異步代碼塊,不過看起來就是這樣:

async {
    // async noises intensify
}

第二個錯誤消息就是第一個的結果:async關鍵字要求函數返回類型是impl Future。若是沒有這個關鍵字,咱們的loop結果類型是(),這顯然不知足要求。

將整個函數體用一個異步代碼塊包裹起來就解決問題了:

fn sleepus() -> impl std::future::Future<Output=()> {
    async {
        for i in 1..=10 {
            println!("Sleepus {}", i);
            sleep(Duration::from_millis(500)).await;
        }
    }
}

四、.await語法的做用

可能咱們並不須要全部這些async/.await。若是咱們移除sleepus的.await會怎麼樣?使人吃驚的是,竟然編譯經過了,雖然給出了一個警告:

warning: unused implementer of `std::future::Future` that must be used
 --> src/main.rs:8:13
  |
8 |             sleep(Duration::from_millis(500));
  |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(unused_must_use)]` on by default
  = note: futures do nothing unless you `.await` or poll them

咱們在生成一個Future值但沒有使用它。若是查看程序的輸出,你能夠理解編譯器的警告是什麼意思了:

Interruptus 1
Sleepus 1
Sleepus 2
Sleepus 3
Sleepus 4
Sleepus 5
Sleepus 6
Sleepus 7
Sleepus 8
Sleepus 9
Sleepus 10
Interruptus 2
Interruptus 3
Interruptus 4
Interruptus 5

咱們全部的Sleepus消息輸出都沒有延遲。問題在於對sleep的調用實際上沒有讓當前線程休息,它只是生成一個實現了Future的值,而後當承諾最終實現時,咱們知道的確發生了延遲。可是因爲咱們簡單地忽略了Future,所以實際上沒有利用延遲。

爲了理解.await語法到底作了什麼,咱們接下來直接使用Future值來實現咱們的函數。首先從不用async塊開始。

五、不使用async關鍵字的Rust異步代碼

若是咱們丟掉async代碼塊,看起來就是這樣:

fn sleepus() -> impl std::future::Future<Output=()> {
    for i in 1..=10 {
        println!("Sleepus {}", i);
        sleep(Duration::from_millis(500));
    }
}

這樣編譯會出現如下錯誤:

error[E0277]: the trait bound `(): std::future::Future` is not satisfied
 --> src/main.rs:4:17
  |
4 | fn sleepus() -> impl std::future::Future<Output=()> {
  |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `std::future::Future` is not implemented for `()`
  |

上面錯誤是因爲for循環的結果類型爲(),它沒有實現Future這個trait。修復這個問題的一種辦法是在for循環後面加一句話使其返回Future的實現類型。咱們已經知道能夠用這個:sleep:

fn sleepus() -> impl std::future::Future<Output=()> {
    for i in 1..=10 {
        println!("Sleepus {}", i);
        sleep(Duration::from_millis(500));
    }
    sleep(Duration::from_millis(0))
}

如今咱們依然會看到在for循環內存在未使用的Future值的警告信息,不過返回值那個錯誤已經解決掉了。這個sleep調用實際上什麼也沒作,咱們能夠將其替換爲一個真正的佔位Future:

fn sleepus() -> impl std::future::Future<Output=()> {
    for i in 1..=10 {
        println!("Sleepus {}", i);
        sleep(Duration::from_millis(500));
    }
    async_std::future::ready(())
}

六、實現本身的Future

爲了打破沙鍋問到底,讓咱們再深刻一步,不適用async_std庫中的ready函數,而是定義本身的實現Future的結構。讓咱們稱之爲DoNothing。

use std::future::Future;

struct DoNothing;

fn sleepus() -> impl Future<Output=()> {
    for i in 1..=10 {
        println!("Sleepus {}", i);
        sleep(Duration::from_millis(500));
    }
    DoNothing
}

問題在於DoNothing尚未提供Future實現。咱們接下來將進行一些編譯器驅動的開發,讓rustc告訴咱們如何修復這個程序。第一個錯誤信息是:

the trait bound `DoNothing: std::future::Future` is not satisfied

所以讓咱們補上這個trait的實現:

impl Future for DoNothing {
}

繼續報錯:

error[E0046]: not all trait items implemented, missing: `Output`, `poll`
 --> src/main.rs:7:1
  |
7 | impl Future for DoNothing {
  | ^^^^^^^^^^^^^^^^^^^^^^^^^ missing `Output`, `poll` in implementation
  |
  = note: `Output` from trait: `type Output;`
  = note: `poll` from trait: `fn(std::pin::Pin<&mut Self>, &mut std::task::Context<'_>) -> std::task::Poll<<Self as std::future::Future>::Output>`

咱們還不是真正瞭解Pin<&mut Self>或者Context,不過咱們知道Output。由於咱們以前返回(),如今讓咱們照作。

use std::pin::Pin;
use std::task::{Context, Poll};

impl Future for DoNothing {
    type Output = ();

    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
        unimplemented!()
    }
}

喔!編譯經過了!固然在運行時它會失敗,由於咱們的unimplemented!()調用:

thread 'async-std/executor' panicked at 'not yet implemented', src/main.rs:13:9

如今讓咱們嘗試實現poll。咱們須要返回一個值其類型爲Poll<Self::Output>或者 Poll<()>。讓咱們看一下Poll的定義:

pub enum Poll<T> {
    Ready(T),
    Pending,
}

利用一些基本的推理,咱們能夠理解Ready表示「咱們的Future已經完成,這是輸出」,而Pending表示「還沒完事兒」。假設咱們的DoNothing但願當即返回()類型的輸出,能夠這樣:

fn poll(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll<Self::Output> {
    Poll::Ready(())
}

恭喜!你剛剛實現了本身的第一個Future結構!

七、async與函數返回值

還記得咱們說過async對函數作的第三件事嗎:自動將結果值封裝爲一個新的Future。咱們接下來展現這一點。

首先簡化sleepus的定義:

fn sleepus() -> impl Future<Output=()> {
    DoNothing
}

編譯和運行正常。如今切換回async風格:

async fn sleepus() {
    DoNothing
}

這時候會報錯:

error[E0271]: type mismatch resolving `<impl std::future::Future as std::future::Future>::Output == ()`
  --> src/main.rs:17:20
   |
17 | async fn sleepus() {
   |                    ^ expected struct `DoNothing`, found ()
   |
   = note: expected type `DoNothing`
              found type `()`

能夠看到,當你有了一個async函數或代碼塊,結果會自動封裝到一個Future實現對象裏。所以咱們須要返回一個impl Future<Output=DoNothing>。如今咱們的類型須要是Output=()

處理很簡單,只須要在DoNothing後面簡單添加.await:

async fn sleepus() {
    DoNothing.await
}

這讓咱們對.await的做用增長了一點直覺:它從DoNothing中提取Output值。不過,咱們依然並不真正瞭解它是如何實現的。如今讓咱們實現一個更復雜的Future來 繼續探索。


原文連接:Rust異步編程async/.await原理解析(一) — 匯智網

相關文章
相關標籤/搜索