Flink的架構

Apache Flink是一個分佈式框架處理引擎,用於對無界和有界數據流進行有狀態計算。Flink運行在全部常見的集羣環境中運行,高效率的執行計算。安全

總體的架構

處理的數據

無界數據流:有一個開始但沒有定義的結束。它們不會在生成時終止並提供數據。必須持續處理無界流,即必須在攝取事件後當即處理事件。沒法等待全部輸入數據到達,由於輸入是無界的,而且在任什麼時候間點都不會完成。處理無界數據一般要求以特定順序(例如事件發生的順序)攝取事件,以便可以推斷結果完整性。(即流數據)數據結構

有界數據流:具備定義的開始和結束。能夠在執行任何計算以前經過攝取全部數據來處理有界流。處理有界流不須要有序攝取,由於能夠始終對有界數據集進行排序。(即批處理)架構

以下圖: 框架

批處理和流處理

和其餘大數據平臺的部署及兼容

Apache Flink是一個分佈式系統,須要計算資源才能執行應用程序。Flink與全部常見的集羣資源管理器(如Hadoop YARN,Apache Mesos和Kubernetes)集成,但也能夠設置爲做爲獨立集羣運行。less

主要處理的數據狀態

Flink旨在以任何規模運行有狀態流應用程序。應用程序能夠並行化爲數千個在集羣中分佈和同時執行的任務。(有狀態和無狀態的區別-> 有狀態對象(Stateful Bean),就是有實例變量的對象,能夠保存數據,是非線程安全的。 無狀態對象(Stateless Bean),就是沒有實例變量的對象,不能保存數據,是不變類,是線程安全的。)異步

對數據狀態的一些優化

有狀態Flink應用程序針對本地狀態訪問進行了優化。任務狀態始終保留在內存中,或者,若是狀態大小超過可用內存,則保存在訪問高效的磁盤上數據結構中。所以,任務經過訪問本地(一般是內存中)狀態來執行全部計算,從而產生很是低的處理延遲。Flink經過按期和異步檢查本地狀態到持久存儲來保證在出現故障時的一次狀態一致性。 以下圖: 分佈式

任務狀態優化

支持的API

Flink提供三種的API:ide

支持三層API

  • SQL和Table API Flink有兩個關係API,Table API和SQL。這兩個API都是用於批處理和流處理的統一API,即,在無界的實時流或有界的記錄流上以相同的語義執行查詢,併產生相同的結果。Table API和SQL利用Apache Calcite進行解析,驗證和查詢優化。它們能夠與DataStream和DataSet API無縫集成,並支持用戶定義的標量,聚合和表值函數。 如下SQL用於對點擊流進行會話並計算每一個會話的點擊次數的SQL查詢:函數

    SELECT userId, COUNT(*)
      FROM clicks
      GROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId
    複製代碼
  • ProcessFunctions是Flink提供的最具表現力的功能接口。Flink提供ProcessFunctions來處理來自窗口中分組的一個或兩個輸入流或事件的單個事件。ProcessFunctions提供對時間和狀態的細粒度控制。ProcessFunction能夠任意修改其狀態並註冊將在將來觸發回調函數的定時器。所以,ProcessFunctions能夠實現許多有狀態事件驅動應用程序所需的複雜的每事件業務邏輯。如下示例顯示了KeyedProcessFunction對a KeyedStream和match START以及END事件進行操做的示例。當一個START被接收的事件,則該函數在記住其狀態時間戳和計時在四個小時的計時器。若是END在計時器觸發以前收到事件,則該函數計算事件END和START事件之間的持續時間,清除狀態並返回值。不然,計時器只會觸發並清除狀態。oop

    /**
       * 匹配流入的START和END事件,並計算兩個元素的時間的差;
       *  第一個String字段是鍵屬性,第二個String屬性標記START和END事件。
       */
      public static class StartEndDuration
          extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> {
      
        private ValueState<Long> startTime;
      
        @Override
        public void open(Configuration conf) {
          // 獲取狀態處理
          startTime = getRuntimeContext()
            .getState(new ValueStateDescriptor<Long>("startTime", Long.class));
        }
      
        @Override
        public void processElement(
            Tuple2<String, String> in,
            Context ctx,
            Collector<Tuple2<String, Long>> out) throws Exception {
      
          switch (in.f1) {
            case "START":
              // 若是接受到一個開始事件,則設置開始時間
              startTime.update(ctx.timestamp());
              // 註冊一個計時器,從開始時間開始的四個小時內
              ctx.timerService()
                .registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000);
              break;
            case "END":
              // 發出開始和結束事件之間的持續時間
              Long sTime = startTime.value();
              if (sTime != null) {
                out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime));
                // 清除狀態
                startTime.clear();
              }
            default:
              // do nothing
          }
        }
      
        /** 計時器觸發時調用 */
        @Override
        public void onTimer(
            long timestamp,
            OnTimerContext ctx,
            Collector<Tuple2<String, Long>> out) {
      
          // 超時時,清除狀態
          startTime.clear();
        }
      }
    複製代碼
  • DataStream API所述的數據流中的API經過查詢外部數據存儲提供了許多常見的流處理操做。數據流API可用於Java和Scala和基於功能,如map(),reduce()和aggregate()。能夠經過擴展接口或Java或Scala lambda函數來定義函數。 如下示例顯示如何對點擊流進行會話並計算每一個會話的點擊次數。

    // 對點擊流進行會話並計算每一個會話的點擊次數
      DataStream<Click> clicks = ...
      
      DataStream<Tuple2<String, Long>> result = clicks
        .map(
          new MapFunction<Click, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(Click click) {
              return Tuple2.of(click.userId, 1L);
            }
          })
        // 定義userId的鍵是0
        .keyBy(0)
        // 定義30分鐘的會話間隙
        .window(EventTimeSessionWindows.withGap(Time.minutes(30L)))
        // 計算每一個會話的點擊數
        .reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));複製代碼
相關文章
相關標籤/搜索