flink異步io

Flink常見Checkpoint超時問題排查思路
flink 之 Checkpoint 出現的錯誤html

Checkpoint完全解密java

 

 

 


轉至元數據結尾apache

 

轉至元數據起始編程

 

狀態

現狀:已發佈安全

討論主題http:  //apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Proposal-for-Asynchronous-IO-in-FLINK-tt13497.htmlapp

JIRA:  FLINK-4391-已解決的流提供異步操做支持   異步

發佈:  Flink 1.2async

Google文檔https:  //docs.google.com/document/d/1Lr9UYXEz6s6R_3PWg3bZQLF3upGaNEkc0rQCFSzaYDI/editide

請將討論保留在郵件列表上,而不是評論維基(維基討論快速笨拙)。函數

動機

在大多數狀況下,I / O訪問是一個耗時的過程,使得單個操做員的TPS遠低於內存計算,特別是對於流式做業,低延遲是用戶最關心的問題。啓動多個線程多是處理此問題的一個選項,但缺點是顯而易見的:最終用戶的編程模型可能會變得更加複雜,由於他們必須在運算符中實現線程模型。此外,他們必須注意與檢查點協調。

條款

AsyncFunction:異步I / O將在AsyncFunction中觸發。

AsyncWaitOperator:一個將調用AsyncFunction的StreamOperator。

AsyncCollector:對於每一個輸入流記錄,將建立AsyncCollector並將其傳遞到用戶的回調以獲取異步i / o結果。

AsyncCollectorBuffer:保留全部AsyncCollector的緩衝區。

發送器線程:AsyncCollectorBuffer中的一個工做線程,當一些AsyncCollectors完成異步i / o並將結果發送到如下操做符時發出信號。

公共接口

添加了一個名爲AsyncDataStream的輔助類,以提供將AsyncFunction(將執行異步i / o操做)添加到FLINK流做業的方法。

AsyncDataStream.java
1
2
3
4
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class AsyncDataStream {
 /**
  * Add an AsyncWaitOperator. The order of output stream records may be reordered.
  *
  * @param in Input data stream
  * @param func AsyncFunction
  * @bufSize The max number of async i/o operation that can be triggered
  * @return A new DataStream.
  */
 public static DataStream<OUT> unorderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, int bufSize);
 public static DataStream<OUT> unorderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func);
  
 /**
  * Add an AsyncWaitOperator. The order of output stream records is guaranteed to be the same as input ones.
  *
  * @param func AsyncWaitFunction
  * @param func AsyncFunction
  * @bufSize The max number of async i/o operation that can be triggered
  * @return A new DataStream.
  */
 public static DataStream<OUT> orderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, int bufSize);
 public static DataStream<OUT> orderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func);
}

提議的變動

概觀

下圖說明了如何處理流式傳輸記錄

  • 到達AsyncWaitOperator
  • 從任務故障轉移中恢復
  • 快照狀態
  • 由Emitter Thread發出

序列圖

AsyncFunction

AsyncFunction 在AsyncWaitOperator中用做函數,它看起來像StreamFlatMap運算符,具備open()/ processElement(StreamRecord <IN> record)/ processWatermark(Watermark mark)。

對於用戶的混凝土AsyncFunction,所述asyncInvoke(IN輸入,AsyncCollector <OUT>集電極)必須重寫以供應代碼開始異步操做。

AsyncFunction.java
1
2
3
4
6
7
8
9
10
11
12
13
14
15
16
public interface AsyncFunction<IN, OUT> extends Function, Serializable {
  /**
   * Trigger async operation for each stream input.
   * The AsyncCollector should be registered into async client.
   *
   * @param input Stream Input
   * @param collector AsyncCollector
   */
  void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
}
  
public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction
    implements AsyncFunction<IN, OUT> {
  @Override
  public abstract void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
}

對於AsyncWaitOperator的每一個輸入流的記錄,它們將被處理經過AsyncFunction.asyncInvoke(IN輸入,AsyncCollector <OUT> CB)。而後AsyncCollector將附加到AsyncCollectorBuffer中。稍後咱們將介紹AsyncCollector和AsyncCollectorBuffer。 

AsyncCollector 

AsyncCollector由AsyncWaitOperator建立,並傳遞到AsyncFunction,它應該被添加到用戶的回調中。它充當從用戶代碼獲取結果或錯誤的角色,通知AsyncCollectorBuffer發出結果。

 特定於用戶的函數是collect,而且應該在異步操做完成或拋出錯誤時調用它們。

AsyncCollector.java
1
2
3
4
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
三十
public class AsyncCollector<OUT> {
  private List<OUT> result;
  private Throwable error;
  private AsyncCollectorBuffer<OUT> buffer;
 
  /**
   * Set result
   * @param result A list of results.
   */
  public void collect(List<OUT> result) {
    this.result = result;
    buffer.mark(this);
  }
 
  /**
   * Set error
   * @param error A Throwable object.
   */
  public void collect(Throwable error) {
    this.error = error;
    buffer.mark(this);
  }
 
  /**
   * Get result. Throw RuntimeException while encountering an error.
   * @return A List of result.
   * @throws RuntimeException RuntimeException wrapping errors from user codes.
   */
  public List<OUT> getResult() throws RuntimeException { ... }
}

它是如何使用的

在調用AsyncFunction.asyncInvoke(IN輸入,AsyncCollector <OUT>收集器)以前,AsyncWaitOperator將嘗試從AsyncCollectorBuffer 獲取AsyncCollector 的實例而後它將被帶入用戶的回調函數。若是緩衝區已滿,它將等待一些正在進行的回調完成。 

異步操做完成後,AsyncCollector.collect()將獲取結果或錯誤,並將通知AsyncCollectorBuffer。

AsyncCollector由FLINK實現。

AsyncCollectorBuffer

AsyncCollectorBuffer保留全部AsyncCollectors,並將結果發送到下一個節點。

調用AsyncCollector.collect()時,標記將放在AsyncCollectorBuffer中,表示已完成的AsyncCollectors。一個名爲Emitter的工做線程也將在AsyncCollector獲取結果後發出信號,而後根據有序或無序設置嘗試發出結果。

爲簡單起見,咱們將在如下文本中將任務引用到AsyncCollectorBuffer中的AsycnCollector。

 

有序和無序

根據用戶配置,將保證或不保證輸出元素的順序。若是不能保證,稍後發佈的完成的AsyncCollectors將會更早發出。

線程

線程將等待完成的AsyncCollectors。在發出信號時,它將處理緩衝區中的任務,以下所示:

  • 有序模式

若是緩衝區中的第一個任務完成,則Emitter將收集其結果,而後繼續執行第二個任務。若是第一項任務還沒有完成,再次等待

  • 無序模式

檢查緩衝區中的全部已完成任務,並從緩衝區中最先的水印以前的那些任務中收集結果。

該線程和任務線程將訪問徹底 經過獲取/釋放鎖。

信號 任務線程在全部任務完成後通知它已經處理完全部數據,而且能夠關閉操做員。

從緩衝區中刪除一些任務後的Signal Task Thread。

傳播任務線程的異常

任務線程

 針對發射qi線程訪問AsyncCollectorBuffer 

獲取並向緩衝區添加新的AsyncCollector,等待緩衝區已滿。

水印

全部水印也將保存在AsyncCollectorBuffer中。當且僅當在發出當前水印以前的全部AsyncCollector以後纔會發出水印。 

狀態,故障轉移和檢查點

州和檢查站

全部輸入StreamRecords都將保持狀態。而不是在處理時逐個將每一個輸入流記錄存儲到狀態,AsyncWaitOperator將在快照操做符狀態時將AsyncCollectorBuffer中的全部輸入流記錄置於狀態。在持久保存這些記錄以前,將清除狀態中的舊數據。

當全部障礙,在操做員已經抵達,安檢點能夠進行當即

故障轉移

在恢復操做員狀態時,操做員將掃描狀態中的全部元素,獲取AsyncCollectors,調用AsyncFunction.asyncInvoke()並將它們插回AsyncCollectorBuffer。

 

筆記

異步資源共享

對於在同一個TaskManager(也就是相同的JVM)中的不一樣插槽(任務工做者)之間共享異步資源(如鏈接到hbase,netty鏈接)的狀況,咱們可使鏈接靜態,以便同一進程中的全部線程均可以共享相同的實例。

固然,請在使用這些資源時注意線程安全

用於回調

Example.java
1
2
3
4
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/***  ***/
public class HBaseAsyncFunction implements AsyncFunction<String, String> {
  // initialize it while reading object
  transient Connection connection;
 
  @Override
  public void asyncInvoke(String val, AsyncCollector<String> c) {
    Get get = new Get(Bytes.toBytes(val));
    // UserCallback is from user’s async client.
    ((AsyncableHTableInterface) ht).asyncGet(get, new UserCallback(c));
  }
}
 
// create data stream
public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {
  DataStream<String> source = getDataStream(env);
  DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());
}

對於ListenableFuture

Example2.java
1
2
3
4
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
三十
31
32
33
34
35
36
37
38
 
public class HBaseAsyncFunction implements AsyncFunction<String, String> {
  // initialize it while reading object
  transient Connection connection;
 
  @Override
  public void asyncInvoke(String val, AsyncCollector<String> c) {
    Get get = new Get(Bytes.toBytes(val));
 
    ListenableFuture<Result> future = ht.asyncGet(get);
      new FutureCallback<Result>() {
        @Override public void onSuccess(Result result) {
          List ret = new ArrayList<String>();
          ret.add(result.get(...));
          c.collect(ret);
        }
 
        @Override public void onFailure(Throwable t) {
          c.collect(t);
        }
      },
    );
  }
}
  
// create data stream
public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {
  DataStream<String> source = getDataStream(env);
  DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());
}

 

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673

相關文章
相關標籤/搜索