Async I/O 是阿里巴巴貢獻給社區的一個呼聲很是高的特性,於1.2版本引入。主要目的是爲了解決與外部系統交互時網絡延遲成爲了系統瓶頸的問題。html
流計算系統中常常須要與外部系統進行交互,好比須要查詢外部數據庫以關聯上用戶的額外信息。一般,咱們的實現方式是向數據庫發送用戶a
的查詢請求,而後等待結果返回,在這以前,咱們沒法發送用戶b
的查詢請求。這是一種同步訪問的模式,以下圖左邊所示。數據庫
圖中棕色的長條表示等待時間,能夠發現網絡等待時間極大地阻礙了吞吐和延遲。爲了解決同步訪問的問題,異步模式能夠併發地處理多個請求和回覆。也就是說,你能夠連續地向數據庫發送用戶a
、b
、c
等的請求,與此同時,哪一個請求的回覆先返回了就處理哪一個回覆,從而連續的請求之間不須要阻塞等待,如上圖右邊所示。這也正是 Async I/O 的實現原理。promise
使用 Async I/O 的前提是須要一個支持異步請求的客戶端。固然,沒有異步請求客戶端的話也能夠將同步客戶端丟到線程池中執行做爲異步客戶端。Flink 提供了很是簡潔的API,讓用戶只須要關注業務邏輯,一些髒活累活好比消息順序性和一致性保證都由框架處理了,多麼棒的事情!網絡
使用方式以下方代碼片斷所示(來自官網文檔):併發
/** 'AsyncFunction' 的一個實現,向數據庫發送異步請求並設置回調 */ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { /** 能夠異步請求的特定數據庫的客戶端 */ lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials) /** future 的回調的執行上下文(當前線程) */ implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()) override def asyncInvoke(str: String, asyncCollector: AsyncCollector[(String, String)]): Unit = { // 發起一個異步請求,返回結果的 future val resultFuture: Future[String] = client.query(str) // 設置請求完成時的回調: 將結果傳遞給 collector resultFuture.onSuccess { case result: String => asyncCollector.collect(Iterable((str, result))); } } } // 建立一個原始的流 val stream: DataStream[String] = ... // 添加一個 async I/O 的轉換 val resultStream: DataStream[(String, String)] = AsyncDataStream.(un)orderedWait( stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, // 超時時間 100) // 進行中的異步請求的最大數量
AsyncDataStream
有兩個靜態方法,orderedWait
和 unorderedWait
,對應了兩種輸出模式:有序和無序。框架
AsyncDataStream.(un)orderedWait
的主要工做就是建立了一個 AsyncWaitOperator
。AsyncWaitOperator
是支持異步 IO 訪問的算子實現,該算子會運行 AsyncFunction
並處理異步返回的結果,其內部原理以下圖所示。異步
如圖所示,AsyncWaitOperator
主要由兩部分組成:StreamElementQueue
和 Emitter
。StreamElementQueue 是一個 Promise 隊列,所謂 Promise 是一種異步抽象表示未來會有一個值(參考 Scala Promise 瞭解更多),這個隊列是未完成的 Promise 隊列,也就是進行中的請求隊列。Emitter 是一個單獨的線程,負責發送消息(收到的異步回覆)給下游。async
圖中E5
表示進入該算子的第五個元素(」Element-5」),在執行過程當中首先會將其包裝成一個 「Promise」 P5
,而後將P5
放入隊列。最後調用 AsyncFunction
的 ayncInvoke
方法,該方法會向外部服務發起一個異步的請求,並註冊回調。該回調會在異步請求成功返回時調用 AsyncCollector.collect
方法將返回的結果交給框架處理。實際上 AsyncCollector
是一個 Promise ,也就是 P5
,在調用 collect
的時候會標記 Promise 爲完成狀態,並通知 Emitter 線程有完成的消息能夠發送了。Emitter 就會從隊列中拉取完成的 Promise ,並從 Promise 中取出消息發送給下游。分佈式
上文提到 Async I/O 提供了兩種輸出模式。其實細分有三種模式: 有序,ProcessingTime 無序,EventTime 無序。Flink 使用隊列來實現不一樣的輸出模式,並抽象出一個隊列的接口(StreamElementQueue
),這種分層設計使得AsyncWaitOperator
和Emitter
不用關心消息的順序問題。StreamElementQueue
有兩種具體實現,分別是 OrderedStreamElementQueue
和 UnorderedStreamElementQueue
。UnorderedStreamElementQueue
比較有意思,它使用了一套邏輯巧妙地實現徹底無序和 EventTime 無序。ide
有序比較簡單,使用一個隊列就能實現。全部新進入該算子的元素(包括 watermark),都會包裝成 Promise 並按到達順序放入該隊列。以下圖所示,儘管P4
的結果先返回,但並不會發送,只有 P1
(隊首)的結果返回了纔會觸發 Emitter 拉取隊首元素進行發送。
ProcessingTime 無序也比較簡單,由於沒有 watermark,不須要協調 watermark 與消息的順序性,因此使用兩個隊列就能實現,一個 uncompletedQueue
一個 completedQueue
。全部新進入該算子的元素,一樣的包裝成 Promise 並放入 uncompletedQueue
隊列,當uncompletedQueue
隊列中任意的Promise返回了數據,則將該 Promise 移到 completedQueue
隊列中,並通知 Emitter 消費。以下圖所示:
EventTime 無序相似於有序與 ProcessingTime 無序的結合體。由於有 watermark,須要協調 watermark 與消息之間的順序性,因此uncompletedQueue
中存放的元素從原先的 Promise 變成了 Promise 集合。若是進入算子的是消息元素,則會包裝成 Promise 放入隊尾的集合中。若是進入算子的是 watermark,也會包裝成 Promise 並放到一個獨立的集合中,再將該集合加入到 uncompletedQueue
隊尾,最後再建立一個空集合加到 uncompletedQueue
隊尾。這樣,watermark 就成了消息順序的邊界。只有處在隊首的集合中的 Promise 返回了數據,才能將該 Promise 移到 completedQueue
隊列中,由 Emitter 消費發往下游。只有隊首集合空了,才能處理第二個集合。這樣就保證了當且僅當某個 watermark 以前全部的消息都已經被髮送了,該 watermark 才能被髮送。過程以下圖所示:
分佈式快照機制是爲了保證狀態的一致性。咱們須要分析哪些狀態是須要快照的,哪些是不須要的。首先,已經完成回調而且已經發往下游的元素是不須要快照的。不然,會致使重發,那就不是 exactly-once 了。而已經完成回調且未發往下游的元素,加上未完成回調的元素,就是上述隊列中的全部元素。
因此快照的邏輯也很是簡單,(1)清空原有的狀態存儲,(2)遍歷隊列中的全部 Promise,從中取出 StreamElement
(消息或 watermark)並放入狀態存儲中,(3)執行快照操做。
恢復的時候,從快照中讀取全部的元素所有再處理一次,固然包括以前已完成回調的元素。因此在失敗恢復後,會有元素重複請求外部服務,可是每一個回調的結果只會被髮往下游一次。