Async I/O 是阿里巴巴貢獻給社區的一個呼聲很是高的特性,於1.2版本引入。主要目的是爲了解決與外部系統交互時網絡延遲成爲了系統瓶頸的問題。數據庫
對於實時處理,當須要使用外部存儲數據染色的時候,須要當心對待,不能讓與外部系統之間的交互延遲對流處理的整個工做進度起決定性的影響。緩存
在mapfunction等算子裏訪問外部存儲,實際上該交互過程是同步的:好比請求a發送到數據庫,那麼mapfunction會一直等待響應。在不少案例中,這個等待過程是很是浪費函數時間的。網絡
與數據庫異步交互,意味着單個函數實例能夠併發處理不少請求,同時併發接收響應。那麼,等待時間因爲發送其它請求和接收其它響應,被重複使用而節省了。至少,等待時間在多個請求上被攤銷。這就使得不少使用案例具備更高的吞吐量。併發
注意:經過增長MapFunction的到一個較大的並行度也是能夠改善吞吐量的,可是這就意味着更高的資源開銷:更多的MapFunction實例意味着更多的task,線程,flink內部網絡鏈接,數據庫的連接,緩存,更多內部狀態開銷。app
1. 前提異步
正確的實現flink的異步IO功能,須要所鏈接的數據庫支持異步客戶端。幸運的是不少流行的數據庫支持這樣的客戶端。async
假如沒有異步客戶端,也能夠建立多個同步客戶端,放到線程池裏,使用線程池來完成異步功能。固然,該種方式相對於異步客戶端更低效。ide
2. 異步IO API函數
flink異步IO的API支持用戶在data stream中使用異步請求客戶端。API自身處理與數據流的整合,消息順序,時間時間,容錯等。post
假若有目標數據庫的異步客戶端,使用異步IO,須要實現一下三步:
實現AsyncFunction,該函數實現了請求分發的功能。
一個callback回調,該函數取回操做的結果,而後傳遞給ResultFuture。
對DataStream使用異步IO操做。
下面的代碼,生名了一個基本模板:
// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// issue the asynchronous request, receive a future for result
final Future<String> result = client.query(key);
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// Normally handled explicitly.
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
// create the original stream
DataStream<String> stream = ...;
// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
重要提示:
第一次調用 ResultFuture.complete的時候 ResultFuture就會完成。全部後續的complete調用都會被忽略。
下面也有兩個參數須要注意一下:
Timeout
異步IO請求被視爲失敗的超時時間,超過該時間異步請求就算失敗。該參數主要是爲了剔除死掉或者失敗的請求。
Capacity
該參數定義了同時最多有多少個異步請求在處理。即便異步IO的方式會致使更高的吞吐量,可是對於實時應用來講該操做也是一個瓶頸。限制併發請求數,算子不會積壓過多的未處理請求,可是一旦超過容量的顯示會觸發背壓。
3. 超時處理
當一個異步IO請求屢次超時,默認狀況下會拋出一個異常,而後重啓job。若是想處理超時,能夠覆蓋AsyncFunction#timeout方法。
4. 結果的順序
AsyncFunction發起的併發請求完成的順序是不可預期的。爲了控制結果發送的順序,flink提供了兩種模式:
1). Unordered
結果記錄在異步請求結束後馬上發送。流中的數據在通過該異步IO操做後順序就和之前不同了。當使用處理時間做爲基礎時間特性的時候,該方式具備極低的延遲和極低的負載。調用方式
AsyncDataStream.unorderedWait(...)
(處理時間無序圖)
2). Ordered
該種方式流的順序會被保留。結果記錄發送的順序和異步請求被觸發的順序同樣,該順序就是願意流中事件的順序。爲了實現該目標,操做算子會在該結果記錄以前的記錄爲發送以前緩存該記錄。這每每會引入額外的延遲和一些Checkpoint負載,由於相比於無序模式結果記錄會保存在Checkpoint狀態內部較長的時間。調用方式
AsyncDataStream.orderedWait(...)
5. 事件時間
當使用事件時間的時候,異步IO操做也會正確的處理watermark機制。這就意味着兩種order模式的具體操做以下:
1). Unordered
watermark不會超過記錄,反之亦然,意味着watermark創建了一個order邊界。記錄僅會在兩個watermark之間無序發射。watermark以後的記錄僅會在watermark發送以後發送。watermark也僅會在該watermark以前的全部記錄發射完成以後發送。
這就意味着在存在watermark的狀況下,無序模式引入了一些與有序模式相同的延遲和管理開銷。開銷的大小取決於watermark的頻率。
(事件時間無序圖)
2). Ordered
watermark的順序就如記錄的順序同樣被保存。與處理時間相比,開銷沒有顯著變化。
請記住,注入時間 Ingestion Time是基於源處理時間自動生成的watermark事件時間的特殊狀況。
6. 容錯擔保
異步IO操做提供了僅一次處理的容錯擔保。它會將在傳出的異步IO請求保存於Checkpoint,而後故障恢復的時候從Checkpoint中恢復這些請求。
詳細的案例公衆號回覆 異步 便可獲得詳細的案例。