本文是基於Flink官網上Asynchronous I/O的介紹結合本身的理解寫成的,如有不正確的歡迎大夥留言交流,謝謝!html
將Flink用於流計算時,若涉及到和外部系統進行交互,如利用Flink從數據庫中讀取數據,這種須要獲取I/O的場景時,咱們須要考慮交互所帶來的時延問題。數據庫
爲分析如何減小時延,咱們先來分析一下,Flink以同步的形式方法外部系統(以MapFunction中和數據庫交互爲例)的過程,若圖1虛線左側所示,請求a發送到database後,MapFunction等待回覆後才進行下發送下一個請求b,期間,I/O處於空閒狀態,請求b又開始重複此過程,這樣在兩個來回的時間內(發送請求-收到結果爲一個來回),只處理兩個請求。如圖1虛線右側所示,一樣是在兩個來回的時間內,以異步的形式進行交互,請求a發出去後,在等待回覆時,請求b,c,d依次發出,這樣既能夠處理4個請求了。apache
圖1 同/異訪問數據庫方式對比圖(Ref[1])緩存
在某些場景下,爲了提升系統的吞吐能力,能夠僅經過增大MapFunction的併發度以達目的,可是隨之而來是資源的大量消耗。多線程
【重要事項】架構
1)爲了實現以異步I/O訪問數據庫或K/V存儲,數據庫等須要有能支持異步請求的client;如果沒有,能夠經過建立多個同步的client並使用線程池處理同步call的方式實現相似併發的client,可是這方式沒有異步I/O的性能好。併發
2)AsyncFunction不是以多線程方式調用的,一個AsyncFunction實例按順序爲每一個獨立消息發送請求;異步
3)目前(Flink 1.9),使用AsyncWaitOperator時要打斷operator chain(默認也是不使用),緣由見FLINK-13063。
async
因爲請求響應的快慢可能不同,AsyncFunction的「併發」請求可能致使結果的亂序 。如圖1中虛線右側所示,若請求b發出以後,其結果在請求a的以前返回,這樣異步I/O算子先後的消息順序就不一致了。爲了控制結果的返回順序,Flink提供了兩種模式:函數
1)Unordered:當異步的請求完成時,其結果立馬返回,不考慮結果順序即亂序模式。當以processing time做爲時間屬性時,該模式能夠得到最小的延時和最小的開銷,使用方式:AsyncDataStream.unorderedWait(...);
2)Ordered:該模式下,消息在異步I/O算子先後的順序一致,先請求的先返回,即有序模式。爲實現有序模式,算子將請求返回的結果放入緩存,直到該請求以前的結果所有返回或超時。該模式一般狀況下回引入額外的時延以及在checkpoint過程當中會帶來開銷,這是由於,和無序模式相比,消息和請求返回的結果都會在checkpoint的狀態中維持更長時間。使用方式:AsyncDataStream.orderedWait(...);
在此,咱們須要針對流任務和event time相結合的狀況進行補充說明。爲何?是由於watermark和消息的總體相對位置是不會變的,什麼意思了?發生在某個watermark以後的消息,只能在watermark被髮出以後發出,其請求結果也是。換句話說,兩個watermark之間的消息總體與watermark的有序的。固然這個區間內消息之間是否有序這得根據使用的模式來分析。
1)對Ordered模式,由於消息自己是有序的,因此watermark和消息之間也是有序的,和processing time相比,其不須要引入額外的開銷;
2)對Unordered模式,其模式是先響應先返回,但在與event time結合的狀況裏,消息或結果都需在特定watermark發出以後才能發出,此時,就會引入延時和開銷,其開銷的大小取決於watermark的頻率,其緣由參加下文原理部分。
爲更加詳細的說明異步I/O的實現過程,先說明幾個term,其中也會涉及其基本用法,若分析原理只看其含義便可。
1)AsyncFunction:異步I/O的觸發接口
AsyncFunction在AsyncWaitOperator中做爲一個用戶函數,相似FlatMap,有open()/processElement(StreamRecord< in > record)/processWatermark(Watermark mark)方法。
對於用戶本身實現的AsyncFunction,必須重寫asyncInvoke(IN input, AsyncCollector collector)來提供調用異步操做的代碼。
2)AsyncWaitOperator:調用AsyncFunction的流算子,是個抽象的概念,具體算子是unorderedWait(...)或orderedWait(...)
3)AsyncCollector:
AsyncCollector由AsyncWaitOperator建立,並傳遞給AsyncFunction,在這裏它應該被添加到用戶的回調函數中。它充當從用戶代碼中獲取結果或錯誤的角色,並通知AsyncCollectorBuffer發出結果。
4)AsyncCollectorBuffer:AsyncCollectorBuffer保存全部的AsyncCollector,並將結果發送給下一個節點。
上述概念是工做示意圖可參見Ref[2]。
在流式計算中,涉及異步I/O的總體過程圖以下:
圖2 異步I/O架構圖(Ref[2])
1)消息達到AsyncWaitOperator後正常處理過程以下:
AsyncWaitOperator調用AsyncFunction,並建立AsyncCollector傳遞給AsyncFunction。AsyncCollector等待獲取到返回結果(異常)以後將入到AsyncCollectorBuffer保存時,會將一條mark消息放入AsyncCollectorBuffer中,而後一個signal信息將會發送到Emitter 線程,若此時是將消息發送出去的signal,則會將消息發送出去並通知task thread加消息到collector buffer中。至於怎麼發要依據代碼中設置的模式是有序仍是無序,如果有序則發head,刪head。該過程的更詳細過程以下圖:
圖3 異步I/O正常處理消息圖(Ref[2])
2)checkpoint過程
AsyncWaitOperator先是對AsyncCollectorBuffer中全部的輸入流數據進行掃描,完成後就刪除state中老的數據,而後將AsyncCollectorBuffer中數據存入到state中,而不是在處理時對單個輸入流一個接一個的存入state,具體過程圖見圖2或圖4。
3)故障恢復
在恢復AsyncWaitOperator的狀態時,AsyncWaitOperator將scan狀態中的全部元素,獲取AsyncCollectors,調用AsyncFunction.asyncInvoke()並將它們插入AsyncCollectorBuffer中,具體的以下:
圖4 故障恢復和checkpoint流程圖(Ref[2])
總結:
關於具體使用的方法見後期的博客,建議大夥看看原文,一千個讀者就有一千個哈姆雷特!
[1]https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/asyncio.html
[2]https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673
[3]https://blog.icocoro.me/2019/05/26/1905-apache-flinkv2-asyncio/