Apache Flink 零基礎入門(三):DataStream API 編程

做者:崔星燦
整理:高贇算法

前面已經爲你們介紹了 Flink 的基本概念以及安裝部署的過程,從而但願可以幫助讀者創建起對 Flink 的初步印象。本次課程開始,咱們將進入第二部分,即 Flink 實際開發的相關內容。本次課程將首先介紹 Flink 開發中比較核心的 DataStream API 。咱們首先將回顧分佈式流處理的一些基本概念,這些概念對於理解實際的 DataStream API 有很是大的做用。而後,咱們將詳細介紹 DataStream API 的設計,最後咱們將經過一個例子來演示 DataStream API 的使用。數據庫

1. 流處理基本概念

對於什麼是流處理,從不一樣的角度有不一樣的定義。其實流處理與批處理這兩個概念是對立統一的,它們的關係有點相似於對於 Java 中的 ArrayList 中的元素,是直接看做一個有限數據集並用下標去訪問,仍是用迭代器去訪問。編程

_04

圖1. 左圖硬幣分類器。硬幣分類器也能夠看做一個流處理系統,用於硬幣分類的各部分組件提早串聯在一塊兒,硬幣不斷進入系統,並最終被輸出到不一樣的隊列中供後續使用。右圖同理。

流處理系統自己有不少本身的特色。通常來講,因爲須要支持無限數據集的處理,流處理系統通常採用一種數據驅動的處理方式。它會提早設置一些算子,而後等到數據到達後對數據進行處理。爲了表達複雜的計算邏輯,包括 Flink 在內的分佈式流處理引擎通常採用 DAG 圖來表示整個計算邏輯,其中 DAG 圖中的每個點就表明一個基本的邏輯單元,也就是前面說的算子。因爲計算邏輯被組織成有向圖,數據會按照邊的方向,從一些特殊的 Source 節點流入系統,而後經過網絡傳輸、本地傳輸等不一樣的數據傳輸方式在算子之間進行發送和處理,最後會經過另一些特殊的 Sink 節點將計算結果發送到某個外部系統或數據庫中。網絡

_05

圖2. 一個 DAG 計算邏輯圖與實際的物理時模型。邏輯圖中的每一個算子在物理圖中可能有多個併發。

對於實際的分佈式流處理引擎,它們的實際運行時物理模型要更復雜一些,這是因爲每一個算子均可能有多個實例。如圖 2 所示,做爲 Source 的 A 算子有兩個實例,中間算子 C 也有兩個實例。在邏輯模型中,A 和 B 是 C 的上游節點,而在對應的物理邏輯中,C 的全部實例和 A、B 的全部實例之間可能都存在數據交換。在物理模型中,咱們會根據計算邏輯,採用系統自動優化或人爲指定的方式將計算工做分佈到不一樣的實例中。只有當算子實例分佈到不一樣進程上時,才須要經過網絡進行數據傳輸,而同一進程中的多個實例之間的數據傳輸一般是不須要經過網絡的。併發

表1. Apache Storm 構造 DAG 計算圖。Apache Storm 的接口定義更加「面向操做」,所以更加底層。
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
表2. Apache Flink 構造 DAG 計算圖。Apache Flink 的接口定義更加「面向數據」,所以更加高層。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile ("input");
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);
counts.writeAsText("output");

因爲流處理的計算邏輯是經過 DAG 圖來表示的,所以它們的大部分 API 都是圍繞構建這種計算邏輯圖來設計的。例如,對於幾年前很是流行的 Apache Storm,它的 Word Count 的示例如表 1 所示。基於 Apache Storm 用戶須要在圖中添加 Spout 或 Bolt 這種算子,並指定算子以前的鏈接方式。這樣,在完成整個圖的構建以後,就能夠將圖提交到遠程或本地集羣運行。框架

與之對比,Apache Flink 的接口雖然也是在構建計算邏輯圖,可是 Flink 的 API 定義更加面向數據自己的處理邏輯,它把數據流抽象成爲一個無限集,而後定義了一組集合上的操做,而後在底層自動構建相應的 DAG 圖。能夠看出,Flink 的 API 要更「上層」一些。許多研究者在進行實驗時,可能會更喜歡自由度高的 Storm,由於它更容易保證明現預想的圖結構;而在工業界則更喜歡 Flink 這類高級 API,由於它使用更加簡單。dom

2. Flink DataStream API 概覽

基於前面對流處理的基本概念,本節將詳細介紹 Flink DataStream API 的使用方式。咱們首先仍是從一個簡單的例子開始看起。表3是一個流式 Word Count 的示例,雖然它只有 5 行代碼,可是它給出了基於 Flink DataStream API 開發程序的基本結構。分佈式

表3. 基於 Flink DataStream API 的 Word Count 示例.
//一、設置運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//二、配置數據源讀取數據
DataStream<String> text = env.readTextFile ("input");
//三、進行一系列轉換
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);
//四、配置數據匯寫出數據
counts.writeAsText("output");
//五、提交執行
env.execute("Streaming WordCount");

爲了實現流式 Word Count,咱們首先要先得到一個 StreamExecutionEnvironment 對象。它是咱們構建圖過程當中的上下文對象。基於這個對象,咱們能夠添加一些算子。對於流處理程度,咱們通常須要首先建立一個數據源去接入數據。在這個例子中,咱們使用了 Environment 對象中內置的讀取文件的數據源。這一步以後,咱們拿到的是一個 DataStream 對象,它能夠看做一個無限的數據集,能夠在該集合上進行一序列的操做。例如,在 Word Count 例子中,咱們首先將每一條記錄(即文件中的一行)分隔爲單詞,這是經過 FlatMap 操做來實現的。調用 FlatMap 將會在底層的 DAG 圖中添加一個 FlatMap 算子。而後,咱們獲得了一個記錄是單詞的流。咱們將流中的單詞進行分組(keyBy),而後累積計算每個單詞的數據(sum(1))。計算出的單詞的數據組成了一個新的流,咱們將它寫入到輸出文件中。ide

最後,咱們須要調用 env#execute 方法來開始程序的執行。須要強調的是,前面咱們調用的全部方法,都不是在實際處理數據,而是在構通表達計算邏輯的 DAG 圖。只有當咱們將整個圖構建完成並顯式的調用 Execute 方法後,框架纔會把計算圖提供到集羣中,接入數據並執行實際的邏輯。優化

基於流式 Word Count 的例子能夠看出,基於 Flink 的 DataStream API 來編寫流處理程序通常須要三步:經過 Source 接入數據、進行一系統列的處理以及將數據寫出。最後,不要忘記顯式調用 Execute 方式,不然前面編寫的邏輯並不會真正執行。

_09

圖3. Flink DataStream 操做概覽

從上面的例子中還能夠看出,Flink DataStream API 的核心,就是表明流數據的 DataStream 對象。整個計算邏輯圖的構建就是圍繞調用 DataStream 對象上的不一樣操做產生新的 DataStream 對象展開的。總體來講,DataStream 上的操做能夠分爲四類。第一類是對於單條記錄的操做,好比篩除掉不符合要求的記錄(Filter 操做),或者將每條記錄都作一個轉換(Map 操做)。第二類是對多條記錄的操做。好比說統計一個小時內的訂單總成交量,就須要將一個小時內的全部訂單記錄的成交量加到一塊兒。爲了支持這種類型的操做,就得經過 Window 將須要的記錄關聯到一塊兒進行處理。第三類是對多個流進行操做並轉換爲單個流。例如,多個流能夠經過 Union、Join 或 Connect 等操做合到一塊兒。這些操做合併的邏輯不一樣,可是它們最終都會產生了一個新的統一的流,從而能夠進行一些跨流的操做。最後, DataStream 還支持與合併對稱的操做,即把一個流按必定規則拆分爲多個流(Split 操做),每一個流是以前流的一個子集,這樣咱們就能夠對不一樣的流做不一樣的處理。

_10

圖4. 不一樣類型的 DataStream 子類型。不一樣的子類型支持不一樣的操做集合。

爲了支持這些不一樣的流操做,Flink 引入了一組不一樣的流類型,用來表示某些操做的中間流數據集類型。完整的類型轉換關係如圖4所示。首先,對於一些針對單條記錄的操做,如 Map 等,操做的結果仍然是是基本的 DataStream 類型。而後,對於 Split 操做,它會首先產生一個 SplitStream,基於 SplitStream 可使用 Select 方法來篩選出符合要求的記錄並再將獲得一個基本的流。

相似的,對於 Connect 操做,在調用 streamA.connect(streamB)後能夠獲得一個專門的 ConnectedStream。ConnectedStream 支持的操做與普通的 DataStream 有所區別,因爲它表明兩個不一樣的流混合的結果,所以它容許用戶對兩個流中的記錄分別指定不一樣的處理邏輯,而後它們的處理結果造成一個新的 DataStream 流。因爲不一樣記錄的處理是在同一個算子中進行的,所以它們在處理時能夠方便的共享一些狀態信息。上層的一些 Join 操做,在底層也是須要依賴於 Connect 操做來實現的。

另外,如前所述,咱們能夠經過 Window 操做對流能夠按時間或者個數進行一些切分,從而將流切分紅一個個較小的分組。具體的切分邏輯能夠由用戶進行選擇。當一個分組中全部記錄都到達後,用戶能夠拿到該分組中的全部記錄,從而能夠進行一些遍歷或者累加操做。這樣,對每一個分組的處理均可以獲得一組輸出數據,這些輸出數據造成了一個新的基本流。

對於普通的 DataStream,咱們必須使用 allWindow 操做,它表明對整個流進行統一的 Window 處理,所以是不能使用多個算子實例進行同時計算的。針對這一問題,就須要咱們首先使用 KeyBy 方法對記錄按 Key 進行分組,而後才能夠並行的對不一樣 Key 對應的記錄進行單獨的 Window 操做。KeyBy 操做是咱們平常編程中最重要的操做之一,下面咱們會更詳細的介紹。

_11

圖5. 基本流上的 Window 操做與 KeyedStream 上的 Window 操對比。KeyedStream 上的 Window 操做使採用多個實例併發處理成爲了可能。

基本 DataStream 對象上的 allWindow 與 KeyedStream 上的 Window 操做的對好比圖5所示。爲了可以在多個併發實例上並行的對數據進行處理,咱們須要經過 KeyBy 將數據進行分組。KeyBy 和 Window 操做都是對數據進行分組,可是 KeyBy 是在水平分向對流進行切分,而 Window 是在垂直方式對流進行切分。

使用 KeyBy 進行數據切分以後,後續算子的每個實例能夠只處理特定 Key 集合對應的數據。除了處理自己外,Flink 中容許算子維護一部分狀態(State),在KeyedStream 算子的狀態也是能夠分佈式存儲的。因爲 KeyBy 是一種肯定的數據分配方式(下文將介紹其它分配方式),所以即便發生 Failover 做業重啓,甚至發生了併發度的改變,Flink 均可以從新分配 Key 分組並保證處理某個 Key 的分組必定包含該 Key 的狀態,從而保證一致性。

最後須要強調的是,KeyBy 操做只有當 Key 的數量超過算子的併發實例數才能夠較好的工做。因爲同一個 Key 對應的全部數據都會發送到同一個實例上,所以若是Key 的數量比實例數量少時,就會致使部分實例收不到數據,從而致使計算能力不能充分發揮。

3. 其它問題

除 KeyBy 以外,Flink 在算子以前交換數據時還支持其它的物理分組方式。如圖 1 所示,Flink DataStream 中物理分組方式包括:

  • Global: 上游算子將全部記錄發送給下游算子的第一個實例。
  • Broadcast: 上游算子將每一條記錄發送給下游算子的全部實例。
  • Forward:只適用於上游算子實例數與下游算子相同時,每一個上游算子實例將記錄發送給下游算子對應的實例。
  • Shuffle:上游算子對每條記錄隨機選擇一個下游算子進行發送。
  • Rebalance:上游算子經過輪詢的方式發送數據。
  • Rescale:當上遊和下游算子的實例數爲 n 或 m 時,若是 n < m,則每一個上游實例向ceil(m/n)或floor(m/n)個下游實例輪詢發送數據;若是 n > m,則 floor(n/m) 或 ceil(n/m) 個上游實例向下遊實例輪詢發送數據。
  • PartitionCustomer:當上述內置分配方式不知足需求時,用戶還能夠選擇自定義分組方式。

_13

圖6. 除keyBy外其它的物理分組方式。

除分組方式外,Flink DataStream API 中另外一個重要概念就是類型系統。圖 7 所示,Flink DataStream 對像都是強類型的,每個 DataStream 對象都須要指定元素的類型,Flink 本身底層的序列化機制正是依賴於這些信息對序列化等進行優化。具體來講,在 Flink 底層,它是使用 TypeInformation 對象對類型進行描述的,TypeInformation 對象定義了一組類型相關的信息供序列化框架使用。

_14

圖7. Flink DataStream API 中的類型系統

Flink 內置了一部分經常使用的基本類型,對於這些類型,Flink 也內置了它們的TypeInformation,用戶通常能夠直接使用而不須要額外的聲明,Flink 本身能夠經過類型推斷機制識別出相應的類型。可是也會有一些例外的狀況,好比,Flink DataStream API 同時支持 Java 和 Scala,Scala API 許多接口是經過隱式的參數來傳遞類型信息的,因此若是須要經過 Java 調用 Scala 的 API,則須要把這些類型信息經過隱式參數傳遞過去。另外一個例子是 Java 中對泛型存在類型擦除,若是流的類型自己是一個泛型的話,則可能在擦除以後沒法推斷出類型信息,這時候也須要顯式的指定。

在 Flink 中,通常 Java 接口採用 Tuple 類型來組合多個字段,而 Scala 則更常用 Row 類型或 Case Class。相對於 Row,Tuple 類型存在兩個問題,一個是字段個數不能超過 25 個,此外,全部字段不容許有 null 值。最後,Flink 也支持用戶自定義新的類型和 TypeInformation,並經過 Kryo 來實現序列化,可是這種方式可帶來一些遷移等方面的問題,因此儘可能不要使用自定義的類型。

4.示例

而後,咱們再看一個更復雜的例子。假設咱們有一個數據源,它監控系統中訂單的狀況,當有新訂單時,它使用 Tuple2<String, Integer> 輸出訂單中商品的類型和交易額。而後,咱們但願實時統計每一個類別的交易額,以及實時統計所有類別的交易額。

表4. 實時訂單統計示例。

public class GroupedProcessingTimeWindowSample {
    private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {
        private volatile boolean isRunning = true;

        @Override
        public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
            Random random = new Random();
            while (isRunning) {
                Thread.sleep((getRuntimeContext().getIndexOfThisSubtask() + 1) * 1000 * 5);
                String key = "類別" + (char) ('A' + random.nextInt(3));
                int value = random.nextInt(10) + 1;

                System.out.println(String.format("Emits\t(%s, %d)", key, value));
                ctx.collect(new Tuple2<>(key, value));
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStream<Tuple2<String, Integer>> ds = env.addSource(new DataSource());
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = ds.keyBy(0);

        keyedStream.sum(1).keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
            @Override
            public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return "";
            }
        }).fold(new HashMap<String, Integer>(), new FoldFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
            @Override
            public HashMap<String, Integer> fold(HashMap<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
                accumulator.put(value.f0, value.f1);
                return accumulator;
            }
        }).addSink(new SinkFunction<HashMap<String, Integer>>() {
            @Override
            public void invoke(HashMap<String, Integer> value, Context context) throws Exception {
                  // 每一個類型的商品成交量
                  System.out.println(value);
                  // 商品成交總量                
                  System.out.println(value.values().stream().mapToInt(v -> v).sum());
            }
        });

        env.execute();
    }
}

示例的實現如表4所示。首先,在該實現中,咱們首先實現了一個模擬的數據源,它繼承自 RichParallelSourceFunction,它是能夠有多個實例的 SourceFunction 的接口。它有兩個方法須要實現,一個是 Run 方法,Flink 在運行時對 Source 會直接調用該方法,該方法須要不斷的輸出數據,從而造成初始的流。在 Run 方法的實現中,咱們隨機的產生商品類別和交易量的記錄,而後經過 ctx#collect 方法進行發送。另外一個方法是 Cancel 方法,當 Flink 須要 Cancel Source Task 的時候會調用該方法,咱們使用一個 Volatile 類型的變量來標記和控制執行的狀態。

而後,咱們在 Main 方法中就能夠開始圖的構建。咱們首先建立了一個 StreamExecutioniEnviroment 對象。建立對象調用的 getExecutionEnvironment 方法會自動判斷所處的環境,從而建立合適的對象。例如,若是咱們在 IDE 中直接右鍵運行,則會建立 LocalStreamExecutionEnvironment 對象;若是是在一個實際的環境中,則會建立 RemoteStreamExecutionEnvironment 對象。

基於 Environment 對象,咱們首先建立了一個 Source,從而獲得初始的<商品類型,成交量>流。而後,爲了統計每種類別的成交量,咱們使用 KeyBy 按 Tuple 的第 1 個字段(即商品類型)對輸入流進行分組,並對每個 Key 對應的記錄的第 2 個字段(即成交量)進行求合。在底層,Sum 算子內部會使用 State 來維護每一個Key(即商品類型)對應的成交量之和。當有新記錄到達時,Sum 算子內部會更新所維護的成交量之和,並輸出一條<商品類型,更新後的成交量>記錄。

若是隻統計各個類型的成交量,則程序能夠到此爲止,咱們能夠直接在 Sum 後添加一個 Sink 算子對不斷更新的各種型成交量進行輸出。可是,咱們還須要統計全部類型的總成交量。爲了作到這一點,咱們須要將全部記錄輸出到同一個計算節點的實例上。咱們能夠經過 KeyBy 而且對全部記錄返回同一個 Key,將全部記錄分到同一個組中,從而能夠所有發送到同一個實例上。

而後,咱們使用 Fold 方法來在算子中維護每種類型商品的成交量。注意雖然目前 Fold 方法已經被標記爲 Deprecated,可是在 DataStream API 中暫時尚未能替代它的其它操做,因此咱們仍然使用 Fold 方法。這一方法接收一個初始值,而後當後續流中每條記錄到達的時候,算子會調用所傳遞的 FoldFunction 對初始值進行更新,併發送更新後的值。咱們使用一個 HashMap 來對各個類別的當前成交量進行維護,當有一條新的<商品類別,成交量>到達時,咱們就更新該 HashMap。這樣在 Sink 中,咱們收到的是最新的商品類別和成交量的 HashMap,咱們能夠依賴這個值來輸出各個商品的成交量和總的成交量。

須要指出的是,這個例子主要是用來演示 DataStream API 的用法,實際上還會有更高效的寫法,此外,更上層的 Table / SQL 還支持 Retraction 機制,能夠更好的處理這種狀況。

_17

圖8. API 原理圖

最後,咱們對 DataStream API 的原理進行簡要的介紹。當咱們調用 DataStream#map 算法時,Flink 在底層會建立一個 Transformation 對象,這一對象就表明咱們計算邏輯圖中的節點。它其中就記錄了咱們傳入的 MapFunction,也就是 UDF(User Define Function)。隨着咱們調用更多的方法,咱們建立了更多的 DataStream 對象,每一個對象在內部都有一個 Transformation 對象,這些對象根據計算依賴關係組成一個圖結構,就是咱們的計算圖。後續 Flink 將對這個圖結構進行進一步的轉換,從而最終生成提交做業所須要的 JobGraph。

5. 總結

本文主要介紹了 Flink DataStream API,它是當前 Flink 中比較底層的一套 API。在實際的開發中,基於該 API 須要用戶本身處理 State 與 Time 等一些概念,所以須要較大的工做量。後續課程還會介紹更上層的 Table / SQL 層的 API,將來 Table / SQL 可能會成爲 Flink 主流的 API,可是對於接口來講,越底層的接口表達能力越強,在一些須要精細操做的狀況下,仍然須要依賴於 DataStream API。

相關文章
相關標籤/搜索