Cris 帶你快速入門 Flink

一 概述

1.1 流處理技術的演變

在開源世界裏,Apache Storm項目是流處理的先鋒。Storm最先由Nathan Marz和創業公司BackType的一個團隊開發,後來才被Apache基金會接納。Storm提供了低延遲的流處理,可是它爲實時性付出了一些代價:很難實現高吞吐,而且其正確性沒能達到一般所需的水平,換句話說,它並不能保證exactly-once,即使是它可以保證的正確性級別,其開銷也至關大。html

在低延遲和高吞吐的流處理系統中維持良好的容錯性是很是困難的,可是爲了獲得有保障的準確狀態,人們想到了一種替代方法:將連續時間中的流數據分割成一系列微小的批量做業。若是分割得足夠小(即所謂的微批處理做業),計算就幾乎能夠實現真正的流處理。由於存在延遲,因此不可能作到徹底實時,可是每一個簡單的應用程序均可以實現僅有幾秒甚至幾亞秒的延遲。這就是在Spark批處理引擎上運行的Spark Streaming所使用的方法。web

更重要的是,使用微批處理方法,能夠實現exactly-once語義,從而保障狀態的一致性。若是一個微批處理失敗了,它能夠從新運行,這比連續的流處理方法更容易。Storm Trident是對Storm的延伸,它的底層流處理引擎就是基於微批處理方法來進行計算的,從而實現了exactly-once語義,可是在延遲性方面付出了很大的代價。redis

對於Storm Trident以及Spark Streaming等微批處理策略,只能根據批量做業時間的倍數進行分割,沒法根據實際狀況分割事件數據,而且,對於一些對延遲比較敏感的做業,每每須要開發者在寫業務代碼時花費大量精力來提高性能。這些靈活性和表現力方面的缺陷,使得這些微批處理策略開發速度變慢,運維成本變高。shell

因而,Flink出現了,這一技術框架能夠避免上述弊端,而且擁有所需的諸多功能,還能按照連續事件高效地處理數據,Flink的部分特性以下圖所示:數據庫

1.2 初識Flink

Flink起源於Stratosphere項目,Stratosphere是在2010~2014年由3所地處柏林的大學和歐洲的一些其餘的大學共同進行的研究項目,2014年4月Stratosphere的代碼被複制並捐贈給了Apache軟件基金會,參加這個孵化項目的初始成員是Stratosphere系統的核心開發人員,2014年12月,Flink一躍成爲Apache軟件基金會的頂級項目。apache

在德語中,Flink一詞表示快速和靈巧,項目採用一隻松鼠的彩色圖案做爲logo,這不只是由於松鼠具備快速和靈巧的特色,還由於柏林的松鼠有一種迷人的紅棕色,而Flink的松鼠logo擁有可愛的尾巴,尾巴的顏色與Apache軟件基金會的logo顏色相呼應,也就是說,這是一隻Apache風格的松鼠。編程

官網連接vim

Flink主頁在其頂部展現了該項目的理念:「Apache Flink是爲分佈式、高性能、隨時可用以及準確的流處理應用程序打造的開源流處理框架」。windows

Apache Flink是一個框架和分佈式處理引擎,用於對無界和有界數據流進行有狀態計算。Flink被設計在全部常見的集羣環境中運行,之內存執行速度和任意規模來執行計算。緩存

1.3 Flink核心計算框架

Flink的核心計算架構是下圖中的Flink Runtime執行引擎,它是一個分佈式系統,可以接受數據流程序並在一臺或多臺機器上以容錯方式執行。

Flink Runtime執行引擎能夠做爲YARN(Yet Another Resource Negotiator)的應用程序在集羣上運行,也能夠在Mesos集羣上運行,還能夠在單機上運行(這對於調試Flink應用程序來講很是有用)。

上圖爲Flink技術棧的核心組成部分,值得一提的是,Flink分別提供了面向流式處理的接口(DataStream API)和麪向批處理的接口(DataSet API)。所以,Flink既能夠完成流處理,也能夠完成批處理。Flink支持的拓展庫涉及機器學習(FlinkML)、復瑣事件處理(CEP)、以及圖計算(Gelly),還有分別針對流處理和批處理的Table API。

Flink 是一個真正的批流結合的大數據計算框架,將大數據背景下的計算統一整合在一塊兒,不只下降了學習和操做難度,也有效實現了離線計算和實時計算的大一統

能被Flink Runtime執行引擎接受的程序很強大,可是這樣的程序有着冗長的代碼,編寫起來也很費力,基於這個緣由,Flink提供了封裝在Runtime執行引擎之上的API,以幫助用戶方便地生成流式計算程序。Flink 提供了用於流處理的DataStream API和用於批處理的DataSet API。值得注意的是,儘管Flink Runtime執行引擎是基於流處理的,可是DataSet API先於DataStream API被開發出來,這是由於工業界對無限流處理的需求在Flink誕生之初並不大。

DataStream API能夠流暢地分析無限數據流,而且能夠用Java或者Scala來實現。開發人員須要基於一個叫DataStream的數據結構來開發,這個數據結構用於表示永不中止的分佈式數據流。

Flink的分佈式特色體如今它可以在成百上千臺機器上運行,它將大型的計算任務分紅許多小的部分,每一個機器執行一部分。Flink可以自動地確保發生機器故障或者其餘錯誤時計算可以持續進行,或者在修復bug或進行版本升級後有計劃地再執行一次。這種能力使得開發人員不須要擔憂運行失敗。Flink本質上使用容錯性數據流,這使得開發人員能夠分析持續生成且永遠不結束的數據(即流處理)。

二 Flink基本架構

2.1 JobManager與TaskManager

Flink運行時包含了兩種類型的處理器:

**JobManager處理器:**也稱之爲Master,用於協調分佈式執行,它們用來調度task,協調檢查點,協調失敗時恢復等。Flink運行時至少存在一個master處理器,若是配置高可用模式則會存在多個master處理器,它們其中有一個是leader,而其餘的都是standby。

TaskManager處理器:也稱之爲Worker,用於執行一個dataflow的task(或者特殊的subtask)、數據緩衝和data stream的交換,Flink運行時至少會存在一個worker處理器。

簡單圖示以下

Master和Worker處理器能夠直接在物理機上啓動,或者經過像YARN這樣的資源調度框架啓動。

Worker鏈接到Master,告知自身的可用性進而得到任務分配。

2.2 無界數據流與有界數據流

Flink用於處理有界和無界數據:

無界數據流無界數據流有一個開始可是沒有結束,它們不會在生成時終止並提供數據,必須連續處理無界流,也就是說必須在獲取後當即處理event。對於無界數據流咱們沒法等待全部數據都到達,由於輸入是無界的,而且在任什麼時候間點都不會完成。處理無界數據一般要求以特定順序(例如事件發生的順序)獲取event,以便可以推斷結果完整性,無界流的處理稱爲流處理

有界數據流有界數據流有明肯定義的開始和結束,能夠在執行任何計算以前經過獲取全部數據來處理有界流,處理有界流不須要有序獲取,由於能夠始終對有界數據集進行排序,有界流的處理也稱爲批處理

在無界數據流和有界數據流中咱們提到了批處理和流處理,這是大數據處理系統中常見的兩種數據處理方式。

批處理的特色是有界、持久、大量,批處理很是適合須要訪問全套記錄才能完成的計算工做,通常用於離線統計流處理的特色是無界、實時,流處理方式無需針對整個數據集執行操做,而是對經過系統傳輸的每一個數據項執行操做,通常用於實時統計

在Spark生態體系中,對於批處理和流處理採用了不一樣的技術框架,批處理由SparkSQL實現,流處理由Spark Streaming實現,這也是大部分框架採用的策略,使用獨立的處理器分別實現批處理和流處理,而Flink能夠同時實現批處理和流處理。

Flink是如何同時實現批處理與流處理的呢?答案是,Flink將批處理(即處理有限的靜態數據)視做一種特殊的流處理

Apache Flink是一個面向分佈式數據流處理和批量數據處理的開源計算平臺,它可以基於同一個Flink運行時(Flink Runtime),提供支持流處理和批處理兩種類型應用的功能。現有的開源計算方案,會把流處理和批處理做爲兩種不一樣的應用類型,由於它們要實現的目標是徹底不相同的:流處理通常須要支持低延遲、Exactly-once保證,而批處理須要支持高吞吐、高效處理,因此在實現的時候一般是分別給出兩套實現方法,或者經過一個獨立的開源框架來實現其中每一種處理方案。例如,實現批處理的開源方案有MapReduce、Tez、Crunch、Spark,實現流處理的開源方案有Samza、Storm。

Flink在實現流處理和批處理時,與傳統的一些方案徹底不一樣,它從另外一個視角看待流處理和批處理,將兩者統一塊兒來:Flink是徹底支持流處理,也就是說做爲流處理看待時輸入數據流是無界的批處理被做爲一種特殊的流處理,只是它的輸入數據流被定義爲有界的。基於同一個Flink運行時(Flink Runtime),分別提供了流處理和批處理API,而這兩種API也是實現上層面向流處理、批處理類型應用框架的基礎。

2.3 數據流編程模型

Flink提供了不一樣級別的抽象,以開發流或批處理做業,以下圖所示:

最底層級的抽象僅僅提供了有狀態流,它將經過過程函數(Process Function)被嵌入到DataStream API中。底層過程函數(Process Function) 與 DataStream API 相集成,使其能夠對某些特定的操做進行底層的抽象,它容許用戶能夠自由地處理來自一個或多個數據流的事件,並使用一致的容錯的狀態。除此以外,用戶能夠註冊事件時間並處理時間回調,從而使程序能夠處理複雜的計算。

實際上,大多數應用並不須要上述的底層抽象,而是針對核心API(Core APIs) 進行編程,好比DataStream API(有界或無界流數據)以及DataSet API(有界數據集)。這些API爲數據處理提供了通用的構建模塊,好比由用戶定義的多種形式的轉換(transformations),鏈接(joins),聚合(aggregations),窗口操做(windows)等等。DataSet API 爲有界數據集提供了額外的支持,例如循環與迭代。這些API處理的數據類型以類(classes)的形式由各自的編程語言所表示。

Table API 以表爲中心,其中表可能會動態變化(在表達流數據時)。Table API遵循(擴展的)關係模型:表有二維數據結構(schema)(相似於關係數據庫中的表),同時API提供可比較的操做,例如select、project、join、group-by、aggregate等。Table API程序聲明式地定義了什麼邏輯操做應該執行,而不是準確地肯定這些操做代碼的看上去如何 。 儘管Table API能夠經過多種類型的用戶自定義函數(UDF)進行擴展,其仍不如核心API更具表達能力,可是使用起來卻更加簡潔(代碼量更少)。除此以外,Table API程序在執行以前會通過內置優化器進行優化。

你能夠在表與 DataStream/DataSet 之間無縫切換,以容許程序將 Table API 與 DataStream 以及 DataSet 混合使用

Flink提供的最高層級的抽象是 SQL 。這一層抽象在語法與表達能力上與 Table API 相似,可是是以SQL查詢表達式的形式表現程序。SQL抽象與Table API交互密切,同時SQL查詢能夠直接在Table API定義的表上執行。

三 Flink集羣搭建

Flink能夠選擇的部署方式有:

Local、Standalone(資源利用率低)、Yarn、Mesos、Docker、Kubernetes、AWS。

咱們主要對Standalone模式和Yarn模式下的Flink集羣部署進行分析。

3.1 Standalone模式安裝

咱們對standalone模式的Flink集羣進行安裝,準備三臺虛擬機,其中一臺做爲JobManager(hadoop101),另外兩臺做爲TaskManager(hadoop10二、hadoop103)。

  1. 首先官網下載

  2. 而後將下載的壓縮包發送到虛擬機上,解壓到指定位置

  3. 而後修改配置文件

    [cris@hadoop101 conf]$ vim flink-conf.yaml
    複製代碼

    而後修改Worker 節點配置

    [cris@hadoop101 conf]$ vim slaves
    複製代碼

  4. 最後將 Flink 同步到其餘兩臺 Worker 節點便可

    [cris@hadoop101 module]$ xsync flink-1.6.1/
    複製代碼
  5. 啓動命令以下

    [cris@hadoop101 bin]$ ./start-cluster.sh
    複製代碼

    很是簡單~

    經過 jps 查看進程狀況

    [cris@hadoop101 bin]$ jpsall 
    ----------jps of hadoop101---------
    2491 StandaloneSessionClusterEntrypoint
    2555 Jps
    ----------jps of hadoop102---------
    2338 Jps
    2285 TaskManagerRunner
    ----------jps of hadoop103---------
    2212 Jps
    2159 TaskManagerRunner
    複製代碼
  6. 訪問集羣web界面(8081端口)

    出現以下界面表示 Flink 集羣啓動成功

  7. 簡單跑個 WC 任務

  8. 關閉集羣

    [cris@hadoop101 bin]$ ./stop-cluster.sh 
    Stopping taskexecutor daemon (pid: 2285) on host hadoop102.
    Stopping taskexecutor daemon (pid: 2159) on host hadoop103.
    Stopping standalonesession daemon (pid: 2491) on host hadoop101.
    [cris@hadoop101 bin]$ jpsall 
    ----------jps of hadoop101---------
    3249 Jps
    ----------jps of hadoop102---------
    2842 Jps
    ----------jps of hadoop103---------
    2706 Jps
    複製代碼

3.2 Yarn模式安裝

前四步同 Standalone 模式

  1. 明確虛擬機中已經設置好了環境變量HADOOP_HOME

  2. 啓動Hadoop集羣(HDFS和Yarn)

  3. 在hadoop101節點提交Yarn-Session,使用安裝目錄下bin目錄中的yarn-session.sh腳本進行提交:

    [cris@hadoop101 ~]$ /opt/module/flink-1.6.1/bin/yarn-session.sh -n 2 -s 6 -jm 1024 -tm 1024 -nm test -d
    複製代碼

    其中:

    -n(--container):TaskManager的數量。

    -s(--slots): 每一個TaskManager的slot數量,默認一個slot一個core,默認每一個taskmanager的slot的個數爲1。

    -jm:JobManager的內存(單位MB)。

    -tm:每一個taskmanager的內存(單位MB)。

    -nm:yarn 的appName(如今yarn的ui上的名字)。

    -d:後臺執行。

  4. 啓動後查看Yarn的Web頁面,能夠看到剛纔提交的會話:

    查看進程信息

  5. 簡單的跑個任務

    [cris@hadoop101 flink-1.6.1]$ ./bin/flink run -m yarn-cluster examples/batch/WordCount.jar
    複製代碼

    終端直接打印結果

    在看看web 界面

四 Flink運行架構

4.1 任務提交流程

Flink任務提交後,Client向HDFS上傳Flink的Jar包和配置,以後向Yarn ResourceManager提交任務,ResourceManager分配Container資源並通知對應的NodeManager啓動ApplicationMaster

ApplicationMaster啓動後加載Flink的Jar包和配置構建環境,而後啓動JobManager,以後ApplicationMaster向ResourceManager申請資源啓動TaskManager,ResourceManager分配Container資源後,由ApplicationMaster通知資源所在節點的NodeManager啓動TaskManager

NodeManager加載Flink的Jar包和配置構建環境並啓動TaskManager,TaskManager啓動後向JobManager發送心跳包,並等待JobManager向其分配任務

4.2 TaskManager與Slots

每個TaskManager是一個JVM進程,它可能會在獨立的線程上執行一個或多個subtask。爲了控制一個worker能接收多少個task,worker經過task slot來進行控制(一個worker至少有一個task slot)。·

每一個task slot表示TaskManager擁有資源的一個固定大小的子集。假如一個TaskManager有三個slot,那麼它會將其管理的內存分紅三份給各個slot。資源slot化意味着一個subtask將不須要跟來自其餘job的subtask競爭被管理的內存,取而代之的是它將擁有必定數量的內存儲備。須要注意的是,這裏不會涉及到CPU的隔離,slot目前僅僅用來隔離task的受管理的內存。

經過調整task slot的數量,容許用戶定義subtask之間如何互相隔離。若是一個TaskManager一個slot,那將意味着每一個task group運行在獨立的JVM中(該JVM多是經過一個特定的容器啓動的),而一個TaskManager多個slot意味着更多的subtask能夠共享同一個JVM。而在同一個JVM進程中的task將共享TCP鏈接(基於多路複用)和心跳消息。它們也可能共享數據集和數據結構,所以這減小了每一個task的負載。

TaskSlot是靜態的概念,是指TaskManager具備的併發執行能力**,能夠經過參數taskmanager.numberOfTaskSlots進行配置,而並行度parallelism是動態概念,即TaskManager運行程序時實際使用的併發能力,能夠經過參數parallelism.default進行配置。

也就是說,假設一共有3個TaskManager,每個TaskManager中的分配3個TaskSlot,也就是每一個TaskManager能夠接收3個task,一共9個TaskSlot,若是咱們設置parallelism.default=1,即運行程序默認的並行度爲1,9個TaskSlot只用了1個,有8個空閒,所以,設置合適的並行度才能提升效率。

4.3 Dataflow

Flink程序由Source、Transformation、Sink這三個核心組件組成,Source主要負責數據的讀取,Transformation主要負責對屬於的轉換操做,Sink負責最終數據的輸出,在各個組件之間流轉的數據稱爲流(streams)。

Flink程序的基礎構建模塊是 (streams) 與 轉換(transformations)(須要注意的是,Flink的DataSet API所使用的DataSets其內部也是stream)。一個stream能夠當作一箇中間結果,而一個transformations是以一個或多個stream做爲輸入的某種operation,該operation利用這些stream進行計算從而產生一個或多個result stream。

在運行時,Flink上運行的程序會被映射成streaming dataflows,它包含了streams和transformations operators。每個dataflow以一個或多個sources開始以一個或多個sinks結束,dataflow相似於任意的有向無環圖(DAG)。

4.4 並行數據流

Flink程序的執行具備並行、分佈式的特性。在執行過程當中,一個 stream 包含一個或多個 stream partition ,而每個 operator 包含一個或多個 operator subtask,這些operator subtasks在不一樣的線程、不一樣的物理機或不一樣的容器中彼此互不依賴得執行。

一個特定operator的subtask的個數被稱之爲其parallelism(並行度)。一個stream的並行度老是等同於其producing operator的並行度。一個程序中,不一樣的operator可能具備不一樣的並行度。

Stream在operator之間傳輸數據的形式能夠是one-to-one(forwarding)的模式也能夠是redistributing的模式,具體是哪種形式,取決於operator的種類。

One-to-onestream(好比在source和map operator之間)維護着分區以及元素的順序。那意味着map operator的subtask看到的元素的個數以及順序跟source operator的subtask生產的元素的個數、順序相同,map、fliter、flatMap等算子都是one-to-one的對應關係。

Redistributing這種操做會改變數據的分區個數。每個operator subtask依據所選擇的transformation發送數據到不一樣的目標subtask。例如,keyBy() 基於hashCode重分區、broadcast和rebalance會隨機從新分區,這些算子都會引發redistribute過程,而redistribute過程就相似於Spark中的shuffle過程。

4.5 task與operatorchains

出於分佈式執行的目的,Flink將operator的subtask連接在一塊兒造成task,每一個task在一個線程中執行。將operators連接成task是很是有效的優化:它能減小線程之間的切換和基於緩存區的數據交換,在減小時延的同時提高吞吐量。連接的行爲能夠在編程API中進行指定。

下面這幅圖,展現了5個subtask以5個並行的線程來執行:

4.6 任務調度流程

客戶端不是運行時和程序執行的一部分,但它用於準備併發送dataflow給Master,而後,客戶端斷開鏈接或者維持鏈接以等待接收計算結果,客戶端能夠以兩種方式運行:要麼做爲Java/Scala程序的一部分被程序觸發執行,要麼以命令行./bin/flink run的方式執行。

五 Flink DataStream API

5.1 Flink運行模型

以上爲Flink的運行模型,Flink的程序主要由三部分構成,分別爲Source、Transformation、Sink。DataSource主要負責數據的讀取,Transformation主要負責對屬於的轉換操做,Sink負責最終數據的輸出。

5.2 Flink程序架構

每一個Flink程序都包含如下的若干流程:

  • 得到一個執行環境;(Execution Environment)

  • 加載/建立初始數據;(Source)

  • 指定轉換這些數據;(Transformation)

  • 指定放置計算結果的位置;(Sink)

  • 觸發程序執行

5.3 Environment

執行環境StreamExecutionEnvironment是全部Flink程序的基礎

建立執行環境有三種方式,分別爲:

StreamExecutionEnvironment.getExecutionEnvironment
StreamExecutionEnvironment.createLocalEnvironment
StreamExecutionEnvironment.createRemoteEnvironment
複製代碼

StreamExecutionEnvironment.getExecutionEnvironment

建立一個執行環境,表示當前執行程序的上下文。 若是程序是獨立調用的,則此方法返回本地執行環境;若是從命令行客戶端調用程序以提交到集羣,則此方法返回此集羣的執行環境,也就是說,getExecutionEnvironment會根據查詢運行的方式決定返回什麼樣的運行環境,是最經常使用的一種建立執行環境的方式。

val env = StreamExecutionEnvironment.getExecutionEnvironment
複製代碼

5.4 Source

I 基於File的數據源

  1. readTextFile(path)

    一列一列的讀取遵循TextInputFormat規範的文本文件,並將結果做爲String返回。

    object Test {
      def main(args: Array[String]): Unit = {
        // 1. 初始化 Flink 執行環境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        // 2. 讀取指定路徑的文本文件
        val stream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    
        // 3. action 算子對 DataStream 中的數據打印
        stream.print()
    
        // 4. 啓動 Flink 應用
        executionEnvironment.execute("test")
      }
    }
    複製代碼

    Terminal 打印結果

    1> apache spark hadoop flume
    1> kafka hbase hive flink
    4> apache spark hadoop flink
    5> kafka hbase hive flink
    6> sqoop hue oozie zookeeper
    8> apache spark hadoop flume
    3> kafka hbase oozie zookeeper
    2> sqoop hue oozie zookeeper
    7> flink oozie azakaban spark
    複製代碼

    注意stream.print():每一行前面的數字表明這一行是哪個並行線程輸出的。

    還能夠根據指定的 fileInputFormat 來讀取文件

    readFile(fileInputFormat, path)

  2. 基於Socket的數據源

    從Socket中讀取信息

    object Test {
      def main(args: Array[String]): Unit = {
        // 1. 初始化 Flink 執行環境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        
        val stream: DataStream[String] = executionEnvironment.socketTextStream("localhost", 1234)
    
        // 3. action 算子對 DataStream 中的數據打印
        stream.print()
    
        // 4. 啓動 Flink 應用
        executionEnvironment.execute("test")
      }
    }
    複製代碼

  3. 基於集合(Collection)的數據源

    1. fromCollection(seq):從集合中建立一個數據流,集合中全部元素的類型是一致的

      val stream: DataStream[Int] = executionEnvironment.fromCollection(List(1,2,3,4))
      複製代碼
    2. fromCollection(Iterator):從迭代(Iterator)中建立一個數據流,指定元素數據類型的類由iterator返回

      val stream: DataStream[Int] = executionEnvironment.fromCollection(Iterator(3,1,2))
      複製代碼
    3. fromElements(elements:_*):從一個給定的對象序列中建立一個數據流,全部的對象必須是相同類型

      val list = List(1,2,3)
      val stream: DataStream[List[Int]] = executionEnvironment.fromElements(list)
      複製代碼
    4. generateSequence(from, to):從給定的間隔中並行地產生一個數字序列

      val stream: DataStream[Long] = executionEnvironment.generateSequence(1,10)
      複製代碼

5.5 Sink

Data Sink 消費DataStream中的數據,並將它們轉發到文件、套接字、外部系統或者打印出。

Flink有許多封裝在DataStream操做裏的內置輸出格式。

1. writeAsText

將元素以字符串形式逐行寫入(TextOutputFormat),這些字符串經過調用每一個元素的toString()方法來獲取。

2. WriteAsCsv

將元素以逗號分隔寫入文件中(CsvOutputFormat),行及字段之間的分隔是可配置的。每一個字段的值來自對象的toString()方法。

3. print/printToErr

打印每一個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中。或者也能夠在輸出流中添加一個前綴,這個能夠幫助區分不一樣的打印調用,若是並行度大於1,那麼輸出也會有一個標識由哪一個任務產生的標誌。

4. writeUsingOutputFormat

自定義文件輸出的方法和基類(FileOutputFormat),支持自定義對象到字節的轉換。

5. writeToSocket

將元素寫入到socket中.

5.6 Transformation

1. map

DataStream → DataStream:輸入一個參數產生一個參數。

// 初始化 Flink 執行環境
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    // 針對每一行數據前面添加指定字符串
    val mapDataStream: DataStream[String] = dataStream.map("Apache:" + _)
    mapDataStream.print()

    // 啓動 Flink 應用
    executionEnvironment.execute("test")
複製代碼

2. flatMap

DataStream → DataStream:輸入一個參數,產生0個、1個或者多個輸出。

val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    // 將每行數據按照空格分割成集合,最終 "壓平"
    val mapDataStream: DataStream[String] = dataStream.flatMap(_.split(" "))
    mapDataStream.print()
複製代碼

3. filter

DataStream → DataStream:結算每一個元素的布爾值,並返回布爾值爲true的元素。

val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
val mapDataStream: DataStream[String] = dataStream.filter(_.contains("kafka"))
複製代碼

4. Connect

DataStream,DataStream → ConnectedStreams:鏈接兩個保持他們類型的數據流,兩個數據流被Connect以後,只是被放在了一個同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立。

// 初始化 Flink 執行環境
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    val listDataStream: DataStream[Int] = executionEnvironment.fromCollection(List(1, 2, 3))
    val connStreams: ConnectedStreams[String, Int] = dataStream.connect(listDataStream)
    // map函數中的第一個函數做用於 ConnectedStreams 的第一個 DataStream;第二個函數做用於第二個 DataStream
    connStreams.map(e => println(e + "-----"), println(_))

    // 啓動 Flink 應用
    executionEnvironment.execute("test")
複製代碼

測試效果以下:

針對 ConnectedStreams 的map 和 flatMap 操做稱之爲 CoMap,CoFlatMap

做用於ConnectedStreams上,功能與map和flatMap同樣,對ConnectedStreams中的每個Stream分別進行map和flatMap處理。

5. split

DataStream → SplitStream:根據某些特徵把一個DataStream拆分紅兩個或者多個DataStream。

val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    val flatMapDStream: DataStream[String] = dataStream.flatMap(_.split(" "))
    val splitDStream: SplitStream[String] = flatMapDStream.split(e => "hadoop".equals(e) match {
      case true => List("hadoop")
      case false => List("other")
    })

    splitDStream.select("hadoop").print()
複製代碼

一般配合 select 算子使用

6. Union

DataStream → DataStream:對兩個或者兩個以上的DataStream進行union操做,產生一個包含全部DataStream元素的新DataStream。注意:若是你將一個DataStream跟它本身作union操做,在新的DataStream中,你將看到每個元素都出現兩次。

val listDStream: DataStream[Int] = executionEnvironment.fromCollection(List(1,2))
    val unionDStream: DataStream[Int] = listDStream.union(listDStream)
    unionDStream.print()
複製代碼

7. KeyBy

DataStream → KeyedStream:輸入必須是Tuple類型,邏輯地將一個流拆分紅不相交的分區,每一個分區包含具備相同key的元素,在內部以hash的形式實現的

val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    val kvDStream: DataStream[(String, Int)] = dataStream.flatMap(_.split(" ")).map((_, 1))
    val result: KeyedStream[(String, Int), String] = kvDStream.keyBy(_._1)
    result.print()
複製代碼

一般結合 reduce 等聚合算子使用

8. Reduce,Fold,Aggregations

KeyedStream → DataStream:一個分組數據流的聚合操做,合併當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是隻返回最後一次聚合的最終結果。

val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    val kvDStream: DataStream[(String, Int)] = dataStream.flatMap(_.split(" ")).map((_, 1))
    val result: KeyedStream[(String, Int), String] = kvDStream.keyBy(_._1)
    val reduceDStream: DataStream[(String, Int)] = result.reduce((iter1, iter2) => (iter1._1, iter1._2 + iter2._2))
    reduceDStream.print()
複製代碼

能夠發現,Flink 並非像 Spark 那樣將最後的總的統計結果返回,而是每次聚合統計都將結果返回,因此須要藉助 Flink 的Window 來進行數據的聚合統計(fold 和 aggregation同理)

其實,reduce、fold、aggregation這些聚合算子都是和Window配合使用的,只有配合Window,才能獲得想要的結果.

fold

KeyedStream → DataStream:一個有初始值的分組數據流的滾動摺疊操做,合併當前元素和前一次摺疊操做的結果,併產生一個新的值,返回的流中包含每一次摺疊的結果,而不是隻返回最後一次摺疊的最終結果。

Aggregations

KeyedStream → DataStream:分組數據流上的滾動聚合操做。min和minBy的區別是min返回的是一個最小值,而minBy返回的是其字段中包含最小值的元素(一樣原理適用於max和maxBy),返回的流中包含每一次聚合的結果,而不是隻返回最後一次聚合的最終結果。

六 Time 和 Window(重點)

6.1 Time

在Flink的流式處理中,會涉及到時間的不一樣概念,以下圖所示:

Event Time:是事件建立的時間。它一般由事件中的時間戳描述,例如採集的日誌數據中,每一條日誌都會記錄本身的生成時間,Flink經過時間戳分配器訪問事件時間戳。

Ingestion Time:是數據進入Flink的時間。

Processing Time:是每個執行基於時間操做的算子的本地系統時間,與機器相關,默認的時間屬性就是Processing Time。

例如,一條日誌進入Flink的時間爲2017-11-12 10:00:00.123,到達Window的系統時間爲2017-11-12 10:00:01.234,日誌的內容以下:

2017-11-02 18:37:15.624 INFO Fail over to rm2
複製代碼

對於業務來講,要統計1min內的故障日誌個數,哪一個時間是最有意義的?—— eventTime,由於咱們要根據日誌的生成時間進行統計。

一般咱們須要指定日誌中的哪條數據是 eventTime

6.2 Window

Window能夠分紅兩類:

  • CountWindow:按照指定的數據條數生成一個Window,與時間無關。

  • TimeWindow:按照時間生成Window。

對於TimeWindow,能夠根據窗口實現原理的不一樣分紅三類:滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口(Session Window)。

對於CountWindow 能夠分爲滾動窗口和滑動窗口

1. 滾動窗口(Tumbling Windows)

將數據依據固定的窗口長度對數據進行切片

特色時間對齊,窗口長度固定,沒有重疊

滾動窗口分配器將每一個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,而且不會出現重疊。例如:若是你指定了一個5分鐘大小的滾動窗口,窗口的建立以下圖所示:

適用場景:適合作BI統計等(作每一個時間段的聚合計算)。

2. 滑動窗口(Sliding Windows)

滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成

特色時間對齊,窗口長度固定,有重疊

滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口相似,窗口的大小由窗口大小參數來配置,另外一個窗口滑動參數控制滑動窗口開始的頻率。所以,滑動窗口若是滑動參數小於窗口大小的話,窗口是能夠重疊的,在這種狀況下元素會被分配到多個窗口中。

例如,你有10分鐘的窗口和5分鐘的滑動,那麼每一個窗口中5分鐘的窗口裏包含着上個10分鐘產生的部分數據,以下圖所示:

適用場景:對最近一個時間段內的統計(求某接口最近5min的失敗率來決定是否要報警)。

3. 會話窗口(Session Windows)

由一系列事件組合一個指定時間長度的timeout間隙組成,相似於web應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口

特色時間無對齊

session窗口分配器經過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的狀況,相反,當它在一個固定的時間週期內再也不收到元素,即非活動間隔產生,那個這個窗口就會關閉。一個session窗口經過一個session間隔來配置,這個session間隔定義了非活躍週期的長度,當這個非活躍週期產生,那麼當前的session將關閉而且後續的元素將被分配到新的session窗口中去。

4. Window API

CountWindow

CountWindow根據窗口中相同key元素的數量來觸發執行,執行時只計算元素數量達到窗口大小的key對應的結果。 注意:CountWindow的window_size指的是相同Key的元素的個數,不是輸入的全部元素的總數。

  • 滾動窗口

    默認的CountWindow是一個滾動窗口,只須要指定窗口大小便可,當元素數量達到窗口大小時,就會觸發窗口的執行。

    def main(args: Array[String]): Unit = {
        // 初始化 Flink 執行環境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost",1234)
        val mapDStream: DataStream[(String, Int)] = socketDStream.map(e => {
          val strings: Array[String] = e.split(" ")
          (strings(0), strings(1).toInt)
        })
        val keyDStream: KeyedStream[(String, Int), Tuple] = mapDStream.keyBy(0)
        // 只有等相同key 的元素個數達到3的時候纔會進行 reduce 和 print 操做
        val windowDStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyDStream.countWindow(3)
        val reduceDStream: DataStream[(String, Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2))
        reduceDStream.print()
    
        // 啓動 Flink 應用
        executionEnvironment.execute("test")
      }
    複製代碼

    測試效果以下:

  • 滑動窗口

    滑動窗口和滾動窗口的函數名是徹底一致的,只是在傳參數時須要傳入兩個參數,一個是window_size,一個是sliding_size。

    下面代碼中的sliding_size設置爲了2,也就是說,每收到兩個相同key的數據就計算一次,每一次計算的window範圍是該 key 的前4個元素。

    def main(args: Array[String]): Unit = {
        // 初始化 Flink 執行環境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost",1234)
        val mapDStream: DataStream[(String, Int)] = socketDStream.map(e => {
          val strings: Array[String] = e.split(" ")
          (strings(0), strings(1).toInt)
        })
        val keyDStream: KeyedStream[(String, Int), Tuple] = mapDStream.keyBy(0)
        // 只有等相同key 的元素個數達到2的時候纔會對該 key 的前4條數據進行 reduce 和 print 操做
        val windowDStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyDStream.countWindow(4,2)
        val reduceDStream: DataStream[(String, Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2))
        reduceDStream.print()
    
        // 啓動 Flink 應用
        executionEnvironment.execute("test")
      }
    }
    複製代碼

TimeWindow

TimeWindow是將指定時間範圍內的全部數據組成一個window,一次對一個window裏面的全部數據進行計算。

  • 滾動窗口

    Flink默認的時間窗口根據Processing Time 進行窗口的劃分,將Flink獲取到的數據根據進入Flink的時間劃分到不一樣的窗口中。

    // 初始化 Flink 執行環境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost",1234)
        val mapDStream: DataStream[(String, Int)] = socketDStream.map(e => {
          val strings: Array[String] = e.split(" ")
          (strings(0), strings(1).toInt)
        })
        val keyDStream: KeyedStream[(String, Int), Tuple] = mapDStream.keyBy(0)
        // 每3 秒對進入該窗口的全部相同key 的數據進行reduce 和 print 操做
        val windowDStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyDStream.timeWindow(Time.seconds(3))
        val reduceDStream: DataStream[(String, Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2))
        reduceDStream.print()
    
        // 啓動 Flink 應用
        executionEnvironment.execute("test")
    複製代碼

  • 滑動窗口

    滑動窗口和滾動窗口的函數名是徹底一致的,只是在傳參數時須要傳入兩個參數,一個是window_size,一個是sliding_size。

    下面代碼中的sliding_size設置爲了2s,也就是說,窗口每2s就計算一次,每一次計算的window範圍是4s內的全部元素。

    // 初始化 Flink 執行環境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost",1234)
        val mapDStream: DataStream[(String, Int)] = socketDStream.map(e => {
          val strings: Array[String] = e.split(" ")
          (strings(0), strings(1).toInt)
        })
        val keyDStream: KeyedStream[(String, Int), Tuple] = mapDStream.keyBy(0)
        // 每2 秒對進入該窗口的全部數據進行前 4 秒數據的 reduce 和 print 操做
        val windowDStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyDStream.timeWindow(Time.seconds(4),Time
          .seconds(2))
        val reduceDStream: DataStream[(String, Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2))
        reduceDStream.print()
    
        // 啓動 Flink 應用
        executionEnvironment.execute("test")
    複製代碼

Window Fold

WindowedStream → DataStream:給窗口賦一個fold功能的函數,並返回一個fold後的結果。

// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 建立SocketSource
val stream = env.socketTextStream("localhost", 11111,'\n',3)

// 對stream進行處理並按key聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)

// 引入滾動窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))

// 執行fold操做
val streamFold = streamWindow.fold(100){
  (begin, item) =>
	begin + item._2
}

// 將聚合數據寫入文件
streamFold.print()

// 執行程序
env.execute("TumblingWindow")
複製代碼
Aggregation on Window

WindowedStream → DataStream:對一個window內的全部元素作聚合操做。min和 minBy的區別是min返回的是最小值,而minBy返回的是包含最小值字段的元素(一樣的原理適用於 max 和 maxBy)。

// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 建立SocketSource
val stream = env.socketTextStream("localhost", 11111)

// 對stream進行處理並按key聚合
val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1))).keyBy(0)

// 引入滾動窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))

// 執行聚合操做
val streamMax = streamWindow.max(1)

// 將聚合數據寫入文件
streamMax.print()

// 執行程序
env.execute("TumblingWindow")
複製代碼

七 EventTime與waterMark

7.1 EventTime的引入

在Flink的流式處理中,絕大部分的業務都會使用eventTime,通常只在eventTime沒法使用時,纔會被迫使用ProcessingTime或者IngestionTime

若是要使用EventTime,那麼須要引入EventTime的時間屬性,引入方式以下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment

// 從調用時刻開始給env建立的每個stream追加時間特徵
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
複製代碼

這裏日誌的時間是 Flink 根據咱們的規則去解析生成的eventTime,而不是默認的 processingTime

而window 的時間區間是左閉右開的,及 2019-01-25 00:00:06 時間的日誌會進入第二個window

7.2 Watermark的引入

咱們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的,雖然大部分狀況下,流到operator的數據都是按照事件產生的時間順序來的,可是也不排除因爲網絡等緣由,致使亂序的產生,所謂亂序,就是指Flink接收到的事件的前後順序不是嚴格按照事件的Event Time順序排列的。

那麼此時出現一個問題,一旦出現亂序,若是隻根據eventTime決定window的運行,咱們不能明確數據是否所有到位,但又不能無限期的等下去,此時必需要有個機制來保證一個特定的時間後,必須觸發window去進行計算了,這個特別的機制,就是Watermark。

Watermark是一種衡量Event Time進展的機制,它是數據自己的一個隱藏屬性,數據自己攜帶着對應的Watermark。

Watermark是用於處理亂序事件的,而正確的處理亂序事件,一般用Watermark機制結合window來實現。

數據流中的Watermark用於表示timestamp小於Watermark的數據,都已經到達了,所以,window的執行也是由Watermark觸發的。

Watermark能夠理解成一個延遲觸發機制,咱們能夠設置Watermark的延時時長t,每次系統會校驗已經到達的數據中最大的maxEventTime,而後認定eventTime小於maxEventTime - t的全部數據都已經到達,若是有窗口的中止時間等於maxEventTime – t,那麼這個窗口被觸發執行。

我的總結一下:針對進入窗口的每條數據,計算當前全部達到窗口的數據的最大eventTime,將這個eventTime和延遲時間(watermark)作減法,差值若是大於某一個窗口的的結束時間,那麼該窗口就進行算子操做

有序流的Watermarker以下圖所示:(Watermark設置爲0)

亂序流的Watermarker以下圖所示:(Watermark設置爲2)

當Flink接收到每一條數據時,都會產生一條Watermark,這條Watermark就等於當前全部到達數據中的maxEventTime - 延遲時長,也就是說,Watermark是由數據攜帶的,一旦數據攜帶的Watermark比當前未觸發的窗口的中止時間要晚,那麼就會觸發相應窗口的執行。因爲Watermark是由數據攜帶的,所以,若是運行過程當中沒法獲取新的數據,那麼沒有被觸發的窗口將永遠都不被觸發

上圖中,咱們設置的容許最大延遲到達時間爲2s,因此時間戳爲7s的事件對應的Watermark是5s,時間戳爲12s的事件的Watermark是10s,若是咱們的窗口1是1s~5s,窗口2是6s~10s,那麼時間戳爲7s的事件到達時的Watermarker剛好觸發窗口1,時間戳爲12s的事件到達時的Watermark剛好觸發窗口2。

7.3 測試代碼

// 初始化 Flink 執行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 將 Flink 時間由默認的processingTime 設置爲 eventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val source: DataStream[String] = env.socketTextStream("localhost", 1234)

    // 設置watermark 以及如何解析每條日誌數據中的eventTime
    val stream: DataStream[String] = source.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {
        override def extractTimestamp(element: String): Long = {
          val time: Long = element.split(" ")(0).toLong
          println(time)
          time
        }
      }
    )

    val keyStream: KeyedStream[(String, Int), Tuple] = stream.map(e => (e.split(" ")(1), 1)).keyBy(0)
    // 設置滾動窗口的長度爲5秒,及每5秒的eventTime 間隔計算一次
    val windowStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyStream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
    val reduceStream: DataStream[(String, Int)] = windowStream.reduce(
      (e1, e2) => (e1._1, e1._2 + e2._2)
    )
    reduceStream.print()

    env.execute("test")
  }
複製代碼

測試以下

若是watermark 設置爲2,那麼等到7000(毫秒)以及大於這個時間的日誌進入window 的時候,纔會進行第一個窗口的計算

若是窗口類型設置爲 SlidingEventTimeWindows ,那麼watermark 影響的就是滑動窗口的計算時間,感興趣的能夠本身試試

若是窗口類型設置爲 EventTimeSessionWindows.withGap(Time.seconds(10)),那麼影響的就是相鄰兩條數據的時間間隔必須大於指定時間纔會觸發計算

八 總結

Flink是一個真正意義上的流計算引擎,在知足低延遲和低容錯開銷的基礎之上,完美的解決了exactly-once的目標,真是因爲Flink具備諸多優勢,愈來愈多的企業開始使用Flink做爲流處理框架,逐步替換掉了本來的Storm和Spark技術框架。

相關文章
相關標籤/搜索