上一篇我寫的文章Tensorflow Rust實戰上篇. 這一次咱們看看使用tensorflow創建了什麼,並經過http接口提供服務。隨着Actix Web1.0版本發佈,我認爲用它構建一些東西將是一個很好的時機。html
本文假設您對Futures及其運做方式有必定的瞭解。我將盡可能用更簡單的術語解釋,但理解Futures生態系統將很是有效地幫助閱讀本文。爲此,我建議你從tokio開始。git
有些人建議在深刻Futures以前等待async/await和friends功能發佈。我認爲你如今應該親自動手:異步編程老是頗有挑戰性。
再一次爲了避免耐煩的人,您能夠在actix-web分支上找到參考代碼:
https://github.com/cetra3/mtc...github
這裏的API很是簡單。咱們想模仿咱們在命令行上所作的事情:提交一張圖片,返回結果是一張圖片。爲了使事情變得有趣,咱們將提供一種方式:將邊界框以JSON數組返回。web
關於經過http協議提交二進制數據,我首先想到了幾種選擇:算法
我認爲最簡單是原始數據,因此讓咱們這樣作! multipart/form-data可能ok,可是你必須處理多個圖像的時候呢? JSON格式彷佛有點浪費,由於您不可避免地必須使用base64或相似的方式轉換二進制數據。編程
因此咱們的API是這樣的:json
在咱們上一篇博客中,咱們只是簡單地使用main函數來執行全部操做,但咱們必須一些重構才能與actix一塊兒使用。咱們但願將MTCNN行爲封裝爲結構,能夠傳遞和轉移。最終目標是在應用程序狀態下使用它。segmentfault
讓咱們將結構包含咱們想要的一切:api
首先,咱們建立一個新文件mtcnn.rs並加上結構體定義。數組
use tensorflow::{Graph, Session, Tensor}; pub struct Mtcnn { graph: Graph, session: Session, min_size: Tensor<f32>, thresholds: Tensor<f32>, factor: Tensor<f32> }
而後,如今咱們只是用new方法填充初始化內容。因爲其中一些值的初始化並不是絕對可靠,咱們將返回Result:
pub fn new() -> Result<Self, Box<dyn Error>> { let model = include_bytes!("mtcnn.pb"); let mut graph = Graph::new(); graph.import_graph_def(&*model, &ImportGraphDefOptions::new())?; let session = Session::new(&SessionOptions::new(), &graph)?; let min_size = Tensor::new(&[]).with_values(&[40f32])?; let thresholds = Tensor::new(&[3]).with_values(&[0.6f32, 0.7f32, 0.7f32])?; let factor = Tensor::new(&[]).with_values(&[0.709f32])?; Ok(Self { graph, session, min_size, thresholds, factor }) }
我將在這裏開始加快節奏,因此若是你遇到困難或不肯定發生了什麼,請查看 Tensorflow Rust實戰上篇,以解釋這裏發生的事情。
咱們已經添加了全部須要跑一個會話的東西。讓咱們建立一個須要API作什麼的方法:提交一張圖片,響應一些邊界框(框出人臉的位置):
pub fn run(&self, img: &DynamicImage) -> Result<Vec<BBoxes>, Status> { ... }
再一次,咱們響應了一個Result類型,由於在某些狀況下run方法會失敗。咱們使用Status類型來表示響應錯誤的類型。
像咱們先前的main方法,咱們須要壓平圖片的輸入:
let input = { let mut flattened: Vec<f32> = Vec::new(); for (_x, _y, rgb) in img.pixels() { flattened.push(rgb[2] as f32); flattened.push(rgb[1] as f32); flattened.push(rgb[0] as f32); } Tensor::new(&[img.height() as u64, img.width() as u64, 3]) .with_values(&flattened)? };
而後咱們將提供全部相關輸入。這與咱們以前的main方法相同,但咱們只是從self中借用值,而不是爲每次運行建立它們:
let mut args = SessionRunArgs::new(); args.add_feed( &self.graph.operation_by_name_required("min_size")?, 0, &self.min_size, ); args.add_feed( &self.graph.operation_by_name_required("thresholds")?, 0, &self.thresholds, ); args.add_feed( &self.graph.operation_by_name_required("factor")?, 0, &self.factor, ); args.add_feed(&self.graph.operation_by_name_required("input")?, 0, &input);
接下來,咱們抓住咱們想要的輸出:
let bbox = args.request_fetch(&self.graph.operation_by_name_required("box")?, 0); let prob = args.request_fetch(&self.graph.operation_by_name_required("prob")?, 0);
如今咱們設置了全部參數,咱們能夠跑session了:
&self.session.run(&mut args)?;
噢哦!咱們獲得一個編譯器錯誤:
error[E0596]: cannot borrow `self.session` as mutable, as it is behind a `&` reference --> src/mtcnn.rs:68:10 | 36 | pub fn run(&self, img: &DynamicImage) -> Result<DynamicImage, Box<dyn Error>> { | ----- help: consider changing this to be a mutable reference: `&mut self` ... 68 | &self.session.run(&mut args)?; | ^^^^^^^^^^^^ `self` is a `&` reference, so the data it refers to cannot be borrowed as mutable
事實證實,Session::run()方法採用&mut self。咱們能夠作些什麼來解決這個問題:
咱們選擇了第三種方式!
更新你的 Cargo.toml,指定git而不是cargo裏的crate版本號:
tensorflow = { git = "https://github.com/tensorflow/rust"}
自從咱們的main方法以來,這一點都沒有改變。咱們獲取邊界框,將它們放入咱們的BBox結構中:
//Our bounding box extents let bbox_res: Tensor<f32> = args.fetch(bbox)?; //Our facial probability let prob_res: Tensor<f32> = args.fetch(prob)?; //Let's store the results as a Vec<BBox> let mut bboxes = Vec::new(); let mut i = 0; let mut j = 0; //While we have responses, iterate through while i < bbox_res.len() { //Add in the 4 floats from the `bbox_res` array. //Notice the y1, x1, etc.. is ordered differently to our struct definition. bboxes.push(BBox { y1: bbox_res[i], x1: bbox_res[i + 1], y2: bbox_res[i + 2], x2: bbox_res[i + 3], prob: prob_res[j], // Add in the facial probability }); //Step `i` ahead by 4. i += 4; //Step `i` ahead by 1. j += 1; } debug!("BBox Length: {}, BBoxes:{:#?}", bboxes.len(), bboxes); Ok(bboxes)
到此,咱們的run方法完成了。
咱們打算響應表明BBox結構體的JSON,因此添加serde_derive中的Serialize(序列化相關模塊):
use serde_derive::Serialize; #[derive(Copy, Clone, Debug, Serialize)] pub struct BBox { pub x1: f32, pub y1: f32, pub x2: f32, pub y2: f32, pub prob: f32, }
咱們將要添加一個方法,輸入一張圖片和一個邊界框數組,響應輸出的圖片:
pub fn overlay(img: &DynamicImage, bboxes: &Vec<BBox>) -> DynamicImage
這裏也沒有多大的變化,只是響應了一張圖片而不是保存一個文件:
//Let's clone the input image let mut output_image = img.clone(); //Iterate through all bounding boxes for bbox in bboxes { //Create a `Rect` from the bounding box. let rect = Rect::at(bbox.x1 as i32, bbox.y1 as i32) .of_size((bbox.x2 - bbox.x1) as u32, (bbox.y2 - bbox.y1) as u32); //Draw a green line around the bounding box draw_hollow_rect_mut(&mut output_image, rect, LINE_COLOUR); } output_image
好的,咱們已經完成了咱們的Mtcnn結構體和方法!咱們能夠進一步嗎?是的,絕對能夠!但就目前而言,我認爲這就是咱們所須要的。咱們已經封裝了行爲並建立了一個很好用的幾個函數。
咱們再也不將它用做命令行程序,而是用做自託管的Web應用程序。由於咱們再也不有輸入和輸出文件,因此咱們須要更改應用程序所需的參數。
我認爲咱們最初應該拿到的惟一參數是監聽地址,即便這樣咱們也應該使用合理的默認值。因此讓咱們經過structopt的幫助來製做這個很是小的demo:
#[derive(StructOpt)] struct Opt { #[structopt( short = "l", long = "listen", help = "Listen Address", default_value = "127.0.0.1:8000" )] listen: String, }
Actix Web使用log crate來顯示errors和debug message。
讓咱們使用log替代println!。我喜歡使用pretty_env_logger,由於它將不一樣的級別打印爲不一樣的顏色,而且咱們可使用有用的時間戳。
pretty_env_logger仍然使用環境變量。那就讓咱們設置環境變量RUST_LOG,而後啓動咱們的logger。
//Set the `RUST_LOG` var if none is provided if env::var("RUST_LOG").is_err() { env::set_var("RUST_LOG", "mtcnn=DEBUG,actix_web=DEBUG"); } //Create a timestamped logger pretty_env_logger::init_timed();
這爲咱們的app和actix web設置了DEBUG級別日誌,但容許咱們經過環境變量更改日誌級別。
咱們須要將一些狀態傳遞給actix使用:Mtcnn結構體和run方法。你能夠經過多種方式傳遞狀態提供actix,但最簡單的方法應該是App::data方法。當咱們正在進入一個多線程世界時,咱們將不得不考慮Send/Sync。
好的,那麼咱們如何在線程之間分享數據呢?好吧,做爲第一步,我會看看std::sync。因爲咱們知道mtcnn的run函數不須要可變引用,只須要不可變self引用,咱們能夠將它包裝在Arc中。若是咱們不得不使用可變引用,那麼可能也須要Mutex,可是若是咱們使用tensorflow-rust的主分支,能夠避免這種狀況。
那麼讓咱們建立一個Arc:
let mtcnn = Arc::new(Mtcnn::new()?);
如今能夠實例化服務:
HttpServer::new(move || { App::new() //Add in our mtcnn struct, we clone the reference for each worker thread .data(mtcnn.clone()) //Add in a logger to see the requests coming through .wrap(middleware::Logger::default()) // Add in some routes here .service( ... ) }) .bind(&opt.listen)? // Use the listener from the command arguments .run()
總結一下咱們已完成的事情:
Actix Web是一個異步框架,使用tokio。咱們的function是同步,須要一些時間才能處理完成。換句話說,咱們的請求是阻塞的。咱們能夠混合使用同步和異步,固然,處理起來有點麻煩。
Actix 1.0大量使用Extractors,Extractors爲方法定義提供徹底不一樣形式。您指定但願接口接收的內容,actix將爲您進行串聯起來。請注意:這確實意味着在運行以前不能發現錯誤。我在web::Data參數中使用了錯誤的類型簽名時的一個示例。
那麼咱們須要從咱們的請求中提取什麼?request body的bytes和mtcnn:
fn handle_request( stream: web::Payload, mtcnn: web::Data<Arc<Mtcnn>>, ) -> impl Future<Item = HttpResponse, Error = ActixError> { ... }
咱們將在mtcnn中使用這種類型(web::Data<Arc<Mtcnn>>),所以讓咱們爲它建立一個類型別名:
type WebMtcnn = web::Data<Arc<Mtcnn>>;
注:這裏的payload指的是http請求中header後面的部分。
咱們須要一種從payload中檢索圖像並返回Future的方法。 web::Payload結構體實現了Stream將Item設置爲Bytes。
從流中得到單個字節是沒有意義的,咱們想要得到整個批次並對圖像進行解碼!所以,讓咱們將Stream轉換爲Future,並將咱們將要得到的全部單個字節合併到一個大的字節桶中。聽起來很複雜,但幸運的是Stream有一個方法:concat2。
concat2是一個很是強大的組合器,它容許咱們將單個Stream輪詢的結果加入到一個集合中,若是該項實現了Extend(以及一些其它的trait),Bytes就會支持擴展。
所以就像這樣:
stream.concat2().and_then(....)
咱們須要解決的第二件事是:若是咱們要解碼出圖像,那麼會阻止線程直到解碼完成。若是它是一個巨大的圖像,它可能須要幾毫秒!所以,咱們但願確保在發生這種狀況時咱們不會發生阻塞。幸運的是,actix web有一種方法能夠將阻塞代碼包裝爲future:
stream.concat2().and_then(move |bytes| { web::block(move || { image::load_from_memory(&bytes) }) })
咱們採用stream,將其轉換爲 future 和 bytes,而後使用 web::block 將字節解碼爲後臺線程中的圖像並返回結果。load_from_memory 函數返回了一個Result,這意味着咱們能夠將其用做返回類型。
所以,咱們的 Item 被轉換爲 Bytes 再到 DynamicImage,但咱們尚未處理錯誤類型,沒法編譯經過。咱們的錯誤類型應該是什麼?讓咱們使用 actix_web::Error 做爲 ActixError:
use actix_web::{Error as ActixError} fn get_image(stream: web::Payload) -> impl Future<Item = DynamicImage, Error = ActixError> { stream.concat2().and_then(move |bytes| { web::block(move || { image::load_from_memory(&bytes) }) }) }
好吧,當咱們嘗試編譯時,出現了錯誤:
error[E0271]: type mismatch resolving `<impl futures::future::Future as futures::future::IntoFuture>::Error == actix_http::error::PayloadError` --> src/main.rs:67:22 | 67 | stream.concat2().and_then(move |bytes| { | ^^^^^^^^ expected enum `actix_threadpool::BlockingError`, found enum `actix_http::error::PayloadError` | = note: expected type `actix_threadpool::BlockingError<image::image::ImageError>` found type `actix_http::error::PayloadError` 還有一些未列出的內容...
當您組合 stream 時,將它們映射爲 future,以及嘗試從這些組合器得到一些輸出時,您實際上處理的是Item類型 和 Error類型 。
處理多種類型的響應結果會使代碼變得醜陋,這裏不像 Result類型可使用問號(?)自動調整到正確的錯誤。當 ops::Try 和 async/await語法變得穩定的時候,事情可能變得簡單,可是如今,咱們必須想辦法處理這些錯誤類型。
咱們可使用 from_err() 方法。做用跟問號(?)基本相同,區別是from_err做用於future。咱們有兩個正在處理的future:來自stream的字節數組 和 來自阻塞閉包的圖像。咱們有3種錯誤類型:the Payload error, the Image load from memory error, and the blocking error:
fn get_image(stream: web::Payload) -> impl Future<Item = DynamicImage, Error = ActixError> { stream.concat2().from_err().and_then(move |bytes| { web::block(move || { image::load_from_memory(&bytes) }).from_err() }) }
最重要的是,咱們須要run起來:
mtcnn.run(&img)
可是咱們想要在一個線程池裏跑起來:
web::block(|| mtcnn.run(&img))
讓咱們看看函數聲明。至少咱們須要圖像和mtcnn結構體。而後咱們想要返回BBox的Vec。咱們保持錯誤類型相同,所以咱們將使用ActixError類型。
函數聲明以下:
fn get_bboxes(img: DynamicImage, mtcnn: WebMtcnn) -> impl Future<Item = Vec<BBox>, Error = ActixError>
咱們須要在 web::block 上使用 from_err() 來轉換錯誤類型,使用move來將圖像提供給閉包:
fn get_bboxes(img: DynamicImage, mtcnn: WebMtcnn) -> impl Future<Item = Vec<BBox>, Error = ActixError> { web::block(move || mtcnn.run(&img)).from_err() }
但仍是會發生了編譯錯誤:
error[E0277]: `*mut tensorflow_sys::TF_Status` cannot be sent between threads safely --> src/main.rs:75:5 | 75 | web::block(move || mtcnn.run(&img)).from_err() | ^^^^^^^^^^ `*mut tensorflow_sys::TF_Status` cannot be sent between threads safely | = help: within `tensorflow::Status`, the trait `std::marker::Send` is not implemented for `*mut tensorflow_sys::TF_Status` = note: required because it appears within the type `tensorflow::Status` = note: required by `actix_web::web::block`
tensorflow::Status,它是錯誤類型,不能在線程之間發送。
快捷方式是將error轉換成String:
fn get_bboxes(img: DynamicImage, mtcnn: WebMtcnn) -> impl Future<Item = Vec<BBox>, Error = ActixError> { web::block(move || mtcnn.run(&img).map_err(|e| e.to_string())).from_err() }
由於String實現了Send,所以容許跨越線程間發送Result。
好的,咱們有2個函數,一個用於從請求中獲取圖像,另外一個用於獲取邊界框。咱們要返回回json HttpResponse:
fn return_bboxes( stream: web::Payload, mtcnn: WebMtcnn, ) -> impl Future<Item = HttpResponse, Error = ActixError> { // Get the image from the input stream get_image(stream) // Get the bounding boxes from the image .and_then(move |img| get_bboxes(img, mtcnn)) // Map the bounding boxes to a json HttpResponse .map(|bboxes| HttpResponse::Ok().json(bboxes)) }
接着,在App裏添接口定義:
HttpServer::new(move || { App::new() .data(mtcnn.clone()) .wrap(middleware::Logger::default()) // our new API service .service(web::resource("/api/v1/bboxes").to_async(return_bboxes)) }) .bind(&opt.listen)? .run()
run起來,用 curl 來提交一個請求:
$ curl --data-binary @rustfest.jpg http://localhost:8000/api/v1/bboxes [{"x1":471.4591,"y1":287.59888,"x2":495.3053,"y2":317.25327,"prob":0.9999908}....
使用 jmespath 來獲取120張臉:
$ curl -s --data-binary @rustfest.jpg http://localhost:8000/api/v1/bboxes | jp "length(@)" 120
咱們想要的另外一個API調用是返回一個覆蓋了邊界框的圖像。 這不是一個很大的延伸,但在圖像上繪製框確定是一個阻塞動做,因此咱們將其發送到線程池中運行。
讓咱們包裝疊加函數,將其轉換爲future:
fn get_overlay(img: DynamicImage, bboxes: Vec<BBox>) -> impl Future<Item = Vec<u8>, Error = ActixError> { web::block(move || { let output_img = overlay(&img, &bboxes); ... }).from_err() }
咱們想要返回一個u8字節的Vec,這樣咱們就能夠在返回體中使用它。 因此咱們須要分配緩衝區並以JPEG格式寫入:
let mut buffer = vec![]; output_img.write_to(&mut buffer, JPEG)?; // write out our buffer Ok(buffer)
將目前爲止的函數嘗試編譯一次:
fn get_overlay(img: DynamicImage, bboxes: Vec<BBox>) -> impl Future<Item = Vec<u8>, Error = ActixError> { web::block(move || { let output_img = overlay(&img, &bboxes); let mut buffer = Vec::new(); output_img.write_to(&mut buffer, JPEG)?; // write out our buffer Ok(buffer) }).from_err() }
還差一點, 咱們缺乏一個類型註解:
error[E0282]: type annotations needed --> src/main.rs:82:5 | 82 | web::block(move || { | ^^^^^^^^^^ cannot infer type for `E`
爲何這裏是類型問題?關聯到這一行:
Ok(buffer) // What's the `Error` type here?
目前,惟一的錯誤類型來自write_to方法,即ImageError。 可是這一行沒有錯誤類型,多是任何東西。
我想到三種方法處理這個問題:
方法一:在web::block中聲明錯誤
web::block::<_,_,ImageError>
這看上去有點凌亂,但能夠編譯經過。
方法二:使用 as 聲明 Result 類型:
Ok(buffer) as Result<_, ImageError>
方法三:使用map在成功時返回一個buffer:
output_img.write_to(&mut buffer, JPEG).map(|_| buffer)
我認爲爲了可讀性,#2多是最簡單的。 web::block函數須要3個類型的參數,這些參數在第一次閱讀代碼時可能會引發混淆。 #3也不錯,但我以爲它看起來有點奇怪。
最終個人選擇:
fn get_overlay(img: DynamicImage, bboxes: Vec<BBox>) -> impl Future<Item = Vec<u8>, Error = ActixError> { web::block(move || { let output_img = overlay(&img, &bboxes); let mut buffer = Vec::new(); output_img.write_to(&mut buffer, JPEG)?; // Type annotations required for the `web::block` Ok(buffer) as Result<_, ImageError> }).from_err() }
好的,咱們擁有了一些返回future的方法,future返回邊界框和疊加圖像。 讓咱們將它們拼接在一塊兒並返回一個HttpResponse:
fn return_overlay( stream: web::Payload, mtcnn: WebMtcnn, ) -> impl Future<Item = HttpResponse, Error = ActixError> { //... magic happens here }
第一步是從字節流中獲取圖像:
get_image(stream)
而後咱們想要獲取邊界框:
get_image(stream).and_then(move |img| { get_bboxes(img, mtcnn) })
如今咱們想要得到疊加圖像。 咱們有一個問題,如何使用image? get_bboxes返回future的圖像,而後計算image上的人臉返回一個邊界框數組。 這裏有幾個選擇。 當咱們將image傳遞給get_bboxes時,咱們能夠克隆image,但這會發生內存拷貝。 咱們能夠等待 Pin 和 async/await 語法完成,而後可能更容易處理它。
或者咱們能夠調整咱們的get_bboxes方法:
fn get_bboxes( img: DynamicImage, mtcnn: WebMtcnn, ) -> impl Future<Item = (DynamicImage, Vec<BBox>), Error = ActixError> { web::block(move || { mtcnn .run(&img) .map_err(|e| e.to_string()) //Return both the image and the bounding boxes .map(|bboxes| (img, bboxes)) }) .from_err() }
記錄把 return_bboxes 方法也修改了:
fn return_bboxes( stream: web::Payload, mtcnn: WebMtcnn, ) -> impl Future<Item = HttpResponse, Error = ActixError> { get_image(stream) .and_then(move |img| get_bboxes(img, mtcnn)) .map(|(_img, bboxes)| HttpResponse::Ok().json(bboxes)) }
若是rust能夠將元組變成命令參數,那就太好了。 不幸的是不適合咱們,因此咱們須要建立一個閉包:
//Create our image overlay .and_then(|(img, bbox)| get_overlay(img, bbox)) .map(|buffer| { // Return a `HttpResponse` here })
咱們的 HttpResponse 須要將 buffer 包裝到一個body:
HttpResponse::with_body(StatusCode::OK, buffer.into())
將 Content-Type設置爲jpeg:
let mut response = HttpResponse::with_body(StatusCode::OK, buffer.into()); response .headers_mut() .insert(CONTENT_TYPE, HeaderValue::from_static("image/jpeg"));
獲取疊加層的最終實現:
fn return_overlay( stream: web::Payload, mtcnn: WebMtcnn, ) -> impl Future<Item = HttpResponse, Error = ActixError> { get_image(stream) .and_then(move |img| { get_bboxes(img, mtcnn) }) .and_then(|(img, bbox) | get_overlay(img, bbox)) .map(|buffer| { let mut response = HttpResponse::with_body(StatusCode::OK, buffer.into()); response .headers_mut() .insert(CONTENT_TYPE, HeaderValue::from_static("image/jpeg")); response }) }
在App註冊此接口:
HttpServer::new(move || { App::new() .data(mtcnn.clone()) //Add in our data handler //Add in a logger to see the requets coming through .wrap(middleware::Logger::default()) //JSON bounding boxes .service(web::resource("/api/v1/bboxes").to_async(return_bboxes)) //Image overlay .service(web::resource("/api/v1/overlay").to_async(return_overlay)) }
run一下:
$ curl --data-binary @rustfest.jpg http://localhost:8000/api/v1/bboxes > output.jpg
結果:
咱們逐步將CLI應用程序轉換爲HTTP服務,並嘗試了異步編程。如您所見,actix web是一個很是通用的Web框架。 我對它的興趣來自於擁有構建Web應用程序所需的全部功能:多組件,線程池,高效率。雖然actix寫異步還不是很優雅,但將來可期,由於我認爲不少開發人員都在努力解決這個問題。
若是您正在尋找更多的actix示例,這個示例倉庫是您最好的選擇:https://github.com/actix/exam...
我期待看到社區將來的建設!