Flink常見Checkpoint超時問題排查思路
flink 之 Checkpoint 出現的錯誤html
Checkpoint完全解密java
轉至元數據結尾apache
轉至元數據起始編程
現狀:已發佈安全
討論主題:http: //apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Proposal-for-Asynchronous-IO-in-FLINK-tt13497.htmlapp
JIRA: FLINK-4391-爲已解決的流提供異步操做支持 異步
發佈: Flink 1.2async
Google文檔:https: //docs.google.com/document/d/1Lr9UYXEz6s6R_3PWg3bZQLF3upGaNEkc0rQCFSzaYDI/editide
請將討論保留在郵件列表上,而不是評論維基(維基討論快速笨拙)。函數
在大多數狀況下,I / O訪問是一個耗時的過程,使得單個操做員的TPS遠低於內存計算,特別是對於流式做業,低延遲是用戶最關心的問題。啓動多個線程多是處理此問題的一個選項,但缺點是顯而易見的:最終用戶的編程模型可能會變得更加複雜,由於他們必須在運算符中實現線程模型。此外,他們必須注意與檢查點協調。
AsyncFunction:異步I / O將在AsyncFunction中觸發。
AsyncWaitOperator:一個將調用AsyncFunction的StreamOperator。
AsyncCollector:對於每一個輸入流記錄,將建立AsyncCollector並將其傳遞到用戶的回調以獲取異步i / o結果。
AsyncCollectorBuffer:保留全部AsyncCollector的緩衝區。
發送器線程:AsyncCollectorBuffer中的一個工做線程,當一些AsyncCollectors完成異步i / o並將結果發送到如下操做符時發出信號。
添加了一個名爲AsyncDataStream的輔助類,以提供將AsyncFunction(將執行異步i / o操做)添加到FLINK流做業的方法。
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public class AsyncDataStream {
/**
* Add an AsyncWaitOperator. The order of output stream records may be reordered.
*
* @param in Input data stream
* @param func AsyncFunction
* @bufSize The max number of async i/o operation that can be triggered
* @return A new DataStream.
*/
public static DataStream<OUT> unorderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, int bufSize);
public static DataStream<OUT> unorderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func);
/**
* Add an AsyncWaitOperator. The order of output stream records is guaranteed to be the same as input ones.
*
* @param func AsyncWaitFunction
* @param func AsyncFunction
* @bufSize The max number of async i/o operation that can be triggered
* @return A new DataStream.
*/
public static DataStream<OUT> orderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, int bufSize);
public static DataStream<OUT> orderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func);
}
|
下圖說明了如何處理流式傳輸記錄
AsyncFunction 在AsyncWaitOperator中用做函數,它看起來像StreamFlatMap運算符,具備open()/ processElement(StreamRecord <IN> record)/ processWatermark(Watermark mark)。
對於用戶的混凝土AsyncFunction,所述asyncInvoke(IN輸入,AsyncCollector <OUT>集電極)必須重寫以供應代碼開始異步操做。
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
|
public interface AsyncFunction<IN, OUT> extends Function, Serializable {
/**
* Trigger async operation for each stream input.
* The AsyncCollector should be registered into async client.
*
* @param input Stream Input
* @param collector AsyncCollector
*/
void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
}
public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction
implements AsyncFunction<IN, OUT> {
@Override
public abstract void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
}
|
對於AsyncWaitOperator的每一個輸入流的記錄,它們將被處理經過AsyncFunction.asyncInvoke(IN輸入,AsyncCollector <OUT> CB)。而後AsyncCollector將附加到AsyncCollectorBuffer中。稍後咱們將介紹AsyncCollector和AsyncCollectorBuffer。
AsyncCollector由AsyncWaitOperator建立,並傳遞到AsyncFunction,它應該被添加到用戶的回調中。它充當從用戶代碼獲取結果或錯誤的角色,並通知AsyncCollectorBuffer發出結果。
特定於用戶的函數是collect,而且應該在異步操做完成或拋出錯誤時調用它們。
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
三十
|
public class AsyncCollector<OUT> {
private List<OUT> result;
private Throwable error;
private AsyncCollectorBuffer<OUT> buffer;
/**
* Set result
* @param result A list of results.
*/
public void collect(List<OUT> result) {
this .result = result;
buffer.mark( this );
}
/**
* Set error
* @param error A Throwable object.
*/
public void collect(Throwable error) {
this .error = error;
buffer.mark( this );
}
/**
* Get result. Throw RuntimeException while encountering an error.
* @return A List of result.
* @throws RuntimeException RuntimeException wrapping errors from user codes.
*/
public List<OUT> getResult() throws RuntimeException { ... }
}
|
在調用AsyncFunction.asyncInvoke(IN輸入,AsyncCollector <OUT>收集器)以前,AsyncWaitOperator將嘗試從AsyncCollectorBuffer 獲取AsyncCollector 的實例。而後它將被帶入用戶的回調函數。若是緩衝區已滿,它將等待一些正在進行的回調完成。
異步操做完成後,AsyncCollector.collect()將獲取結果或錯誤,並將通知AsyncCollectorBuffer。
AsyncCollector由FLINK實現。
AsyncCollectorBuffer保留全部AsyncCollectors,並將結果發送到下一個節點。
調用AsyncCollector.collect()時,標記將放在AsyncCollectorBuffer中,表示已完成的AsyncCollectors。一個名爲Emitter的工做線程也將在AsyncCollector獲取結果後發出信號,而後根據有序或無序設置嘗試發出結果。
爲簡單起見,咱們將在如下文本中將任務引用到AsyncCollectorBuffer中的AsycnCollector。
根據用戶配置,將保證或不保證輸出元素的順序。若是不能保證,稍後發佈的完成的AsyncCollectors將會更早發出。
線程將等待完成的AsyncCollectors。在發出信號時,它將處理緩衝區中的任務,以下所示:
有序模式
若是緩衝區中的第一個任務完成,則Emitter將收集其結果,而後繼續執行第二個任務。若是第一項任務還沒有完成,請再次等待。
無序模式
檢查緩衝區中的全部已完成任務,並從緩衝區中最先的水印以前的那些任務中收集結果。
該線程和任務線程將訪問徹底 經過獲取/釋放鎖。
信號 任務線程在全部任務完成後通知它已經處理完全部數據,而且能夠關閉操做員。
從緩衝區中刪除一些任務後的Signal Task Thread。
傳播任務線程的異常。
僅 針對發射qi線程訪問AsyncCollectorBuffer 。
獲取並向緩衝區添加新的AsyncCollector,等待緩衝區已滿。
全部水印也將保存在AsyncCollectorBuffer中。當且僅當在發出當前水印以前的全部AsyncCollector以後纔會發出水印。
全部輸入StreamRecords都將保持狀態。而不是在處理時逐個將每一個輸入流記錄存儲到狀態,AsyncWaitOperator將在快照操做符狀態時將AsyncCollectorBuffer中的全部輸入流記錄置於狀態。在持久保存這些記錄以前,將清除狀態中的舊數據。
當全部障礙,在操做員已經抵達,安檢點能夠進行當即。
在恢復操做員狀態時,操做員將掃描狀態中的全部元素,獲取AsyncCollectors,調用AsyncFunction.asyncInvoke()並將它們插回AsyncCollectorBuffer。
對於在同一個TaskManager(也就是相同的JVM)中的不一樣插槽(任務工做者)之間共享異步資源(如鏈接到hbase,netty鏈接)的狀況,咱們可使鏈接靜態,以便同一進程中的全部線程均可以共享相同的實例。
固然,請在使用這些資源時注意線程安全。
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
/*** ***/
public class HBaseAsyncFunction implements AsyncFunction<String, String> {
// initialize it while reading object
transient Connection connection;
@Override
public void asyncInvoke(String val, AsyncCollector<String> c) {
Get get = new Get(Bytes.toBytes(val));
Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes( "test" )));
// UserCallback is from user’s async client.
((AsyncableHTableInterface) ht).asyncGet(get, new UserCallback(c));
}
}
// create data stream
public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {
DataStream<String> source = getDataStream(env);
DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());
}
|
1
2
3
4
五
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
三十
31
32
33
34
35
36
37
38
|
public class HBaseAsyncFunction implements AsyncFunction<String, String> {
// initialize it while reading object
transient Connection connection;
@Override
public void asyncInvoke(String val, AsyncCollector<String> c) {
Get get = new Get(Bytes.toBytes(val));
Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes( "test" )));
ListenableFuture<Result> future = ht.asyncGet(get);
new FutureCallback<Result>() {
@Override public void onSuccess(Result result) {
List ret = new ArrayList<String>();
}
@Override public void onFailure(Throwable t) {
}
},
);
}
}
// create data stream
public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {
DataStream<String> source = getDataStream(env);
DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());
}
|
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673