【轉】Apache Flink介紹

一、Apache Flink介紹

既然有了Apache Spark,爲何還要使用Apache Flink?css

由於Flink是一個純流式計算引擎,而相似於Spark這種微批的引擎,只是Flink流式引擎的一個特例。其餘的不一樣點以後會陸續談到。html

1.1 歷史

Flink起源於一個叫作Stratosphere的研究項目,目標是創建下一代大數據分析引擎,其在2014年4月16日成爲Apache的孵化項目,從Stratosphere 0.6開始,正式改名爲Flink。Flink 0.7中介紹了最重要的特性:Streaming API。最初只支持Java API,後來增長了Scala API。java

1.2 架構

Flink 1.X版本的包含了各類各樣的組件,包括部署、flink core(runtime)以及API和各類庫。git

這裏寫圖片描述

從部署上講,Flink支持local模式、集羣模式(standalone集羣或者Yarn集羣)、雲端部署。Runtime是主要的數據處理引擎,它以JobGraph形式的API接收程序,JobGraph是一個簡單的並行數據流,包含一系列的tasks,每一個task包含了輸入和輸出(source和sink例外)。github

DataStream API和DataSet API是流處理和批處理的應用程序接口,當程序在編譯時,生成JobGraph。編譯完成後,根據API的不一樣,優化器(批或流)會生成不一樣的執行計劃。根據部署方式的不一樣,優化後的JobGraph被提交給了executors去執行。算法

1.3 分佈式執行

Flink分佈式程序包含2個主要的進程:JobManager和TaskManager.當程序運行時,不一樣的進程就會參與其中,包括Jobmanager、TaskManager和JobClient。apache

這裏寫圖片描述

首先,Flink程序提交給JobClient,JobClient再提交到JobManager,JobManager負責資源的協調和Job的執行。一旦資源分配完成,task就會分配到不一樣的TaskManager,TaskManager會初始化線程去執行task,並根據程序的執行狀態向JobManager反饋,執行的狀態包括starting、in progress、finished以及canceled和failing等。當Job執行完成,結果會返回給客戶端。bootstrap

1.3.1 JobManager

Master進程,負責Job的管理和資源的協調。包括任務調度,檢查點管理,失敗恢復等。windows

固然,對於集羣HA模式,能夠同時多個master進程,其中一個做爲leader,其餘做爲standby。當leader失敗時,會選出一個standby的master做爲新的leader(經過zookeeper實現leader選舉)。 
JobManager包含了3個重要的組件:緩存

  1.  
    (1)Actor系統
  2.  
    (2)調度
  3.  
    (3)檢查點
  • 1
  • 2
  • 3

1.3.1.1 Actor系統

Flink內部使用Akka模型做爲JobManager和TaskManager之間的通訊機制。

Actor系統是個容器,包含許多不一樣的Actor,這些Actor扮演者不一樣的角色。Actor系統提供相似於調度、配置、日誌等服務,同時包含了全部actors初始化時的線程池。

全部的Actors存在着層級的關係。新加入的Actor會被分配一個父類的Actor。Actors之間的通訊採用一個消息系統,每一個Actor都有一個「郵箱」,用於讀取消息。若是Actors是本地的,則消息在共享內存中共享;若是Actors是遠程的,則消息經過RPC遠程調用。

每一個父類的Actor都負責監控其子類Actor,當子類Actor出現錯誤時,本身先嚐試重啓並修復錯誤;若是子類Actor不能修復,則將問題升級並由父類Actor處理。 
在Flink中,actor是一個有狀態和行爲的容器。Actor的線程持續的處理從「郵箱」中接收到的消息。Actor中的狀態和行爲則由收到的消息決定。

這裏寫圖片描述

1.3.1.2 調度器

Flink中的Executors被定義爲task slots(線程槽位)。每一個Task Manager須要管理一個或多個task slots。

Flink經過SlotSharingGroup和CoLocationGroup來決定哪些task須要被共享,哪些task須要被單獨的slot使用。

1.3.1.3 檢查點

Flink的檢查點機制是保證其一致性容錯功能的骨架。它持續的爲分佈式的數據流和有狀態的operator生成一致性的快照。其改良自Chandy-Lamport算法,叫作ABS(輕量級異步Barrier快照),具體參見論文: 
Lightweight Asynchronous Snapshots for Distributed Dataflows

Flink的容錯機制持續的構建輕量級的分佈式快照,所以負載很是低。一般這些有狀態的快照都被放在HDFS中存儲(state backend)。程序一旦失敗,Flink將中止executor並從最近的完成了的檢查點開始恢復(依賴可重發的數據源+快照)。

Barrier做爲一種Event,是Flink快照中最主要的元素。它會隨着data record一塊兒被注入到流數據中,並且不會超越data record。每一個barrier都有一個惟一的ID,將data record分到不一樣的檢查點的範圍中。下圖展現了barrier是如何被注入到data record中的: 
這裏寫圖片描述

每一個快照中的狀態都會報告給Job Manager的檢查點協調器;快照發生時,flink會在某些有狀態的operator上對data record進行對齊操做(alignment),目的是避免失敗恢復時重複消費數據。這個過程也是exactly once的保證。一般對齊操做的時間僅是毫秒級的。可是對於某些極端的應用,在每一個operator上產生的毫秒級延遲也不能容許的話,則能夠選擇降級到at least once,即跳過對齊操做,當失敗恢復時可能發生重複消費數據的狀況。Flink默認採用exactly once意義的處理。

1.3.2 TaskManager

Task Managers是具體執行tasks的worker節點,執行發生在一個JVM中的一個或多個線程中。Task的並行度是由運行在Task Manager中的task slots的數量決定。若是一個Task Manager有4個slots,那麼JVM的內存將分配給每一個task slot 25%的內存。一個Task slot中能夠運行1個或多個線程,同一個slot中的線程又能夠共享相同的JVM。在相同的JVM中的tasks,會共享TCP鏈接和心跳消息: 
這裏寫圖片描述

1.3.3 Job Client

Job Client並非Flink程序執行中的內部組件,而是程序執行的入口。Job Client負責接收用戶提交的程序,並建立一個data flow,而後將生成的data flow提交給Job Manager。一旦執行完成,Job Client將返回給用戶結果。

Data flow就是執行計劃,好比下面一個簡單的word count的程序:

這裏寫圖片描述

當用戶將這段程序提交時,Job Client負責接收此程序,並根據operator生成一個data flow,那麼這個程序生成的data flow也許看起來像是這個樣子:

這裏寫圖片描述

默認狀況下,Flink的data flow都是分佈式並行處理的,對於數據的並行處理,flink將operators和數據流進行partition。Operator partitions叫作sub-tasks。數據流又能夠分爲一對一的傳輸與重分佈的狀況。

這裏寫圖片描述

咱們看到,從source到map的data flow,是一個一對一的關係,不必產生shuffle操做;而從map到groupBy操做,flink會根據key將數據重分佈,即shuffle操做,目的是聚合數據,產生正確的結果。

1.4 特性

1.4.1 高性能

Flink自己就被設計爲高性能和低延遲的引擎。不像Spark這種框架,你沒有必要作許多手動的配置,用以得到最佳性能,Flink管道式(pipeline)的數據處理方式已經給了你最佳的性能。

1.4.2 有狀態的支持Exactly once的計算

經過檢查點+可重發的數據源,使得Flink對於stateful的operator,支持exactly once的計算。固然你能夠選擇降級到at least once。

1.4.3 靈活的流處理窗口

Flink支持數據驅動的窗口,這意味着咱們能夠基於時間(event time或processing time)、count和session來構建窗口;窗口同時能夠定製化,經過特定的pattern實現。

1.4.4 容錯機制

經過輕量級、分佈式快照實現。

1.4.5 內存管理

Flink在JVM內部進行內存的自我管理,使得其獨立於java自己的垃圾回收機制。當處理hash、index、caching和sorting時,Flink自個人內存管理方式使得這些操做很高效。可是,目前自個人內存管理只在批處理中實現,流處理程序並未使用。

1.4.6 優化器

爲了不shuffle、sort等操做,Flink的批處理API進行了優化,它能夠確保避免過分的磁盤IO而儘量使用緩存。

1.4.7 流和批的統一

Flink中批和流有各自的API,你既能夠開發批程序,也能夠開發流處理程序。事實上,Flink中的流處理優先原則,認爲批處理是流處理的一種特殊狀況。

1.4.8 Libraries庫

Flink提供了用於機器學習、圖計算、Table API等庫,同時Flink也支持複雜的CEP處理和警告。

1.4.9 Event Time語義

Flink支持Event Time語義的處理,這有助於處理流計算中的亂序問題,有些數據也許會遲到,咱們能夠經過基於event time、count、session的窗口來處於這樣的場景。

1.5 快速安裝

直接參見官方文檔:QuickStart

1.6 Standalone 集羣安裝

直接參見官方文檔:Standalone Cluster

1.7 例子

略去,可參見官方文檔:Examples

1.8 總結

Flink細節上的討論和處理模型。下一章將介紹Flink Streaming API。

二、用DataStream API處理數據

許多領域須要數據的實時處理,物聯網驅動的應用程序在數據的存儲、處理和分析上須要實時或準實時的進行。

Flink提供流處理的API叫作DataStream API,每一個Flink程序均可以按照下面的步驟進行開發:

這裏寫圖片描述

2.1 運行環境

咱們首先要得到已經存在的運行環境或者建立它。有3種方法獲得運行環境:

  1.  
    1)經過getExecutionEnvironment()得到;這將根據上下文獲得運行環境,假如local模式,則它會建立一個local的運行環境;假如是集羣模式,則會建立一個分佈式的運行環境;
  2.  
    2)經過createLocalEnvironment() 建立一個本地的運行環境;
  3.  
    3)經過createRemoteEnvironment (String host, int port, String, and .jar files)建立一個遠程的運行環境。
  • 1
  • 2
  • 3

2.2 數據源

Flink支持許多預約義的數據源,同時也支持自定義數據源。下面咱們看看有哪些預約義的數據源。

2.2.1 基於socket

DataStream API支持從socket讀取數據,有以下3個方法:

  1.  
    socketTextStream(hostName, port);
  2.  
    socketTextStream(hostName, port,delimiter)
  3.  
    socketTextStream(hostName, port,delimiter, maxRetry)
  • 1
  • 2
  • 3

2.2.2 基於文件

你可使用readTextFile(String path)來消費文件中的數據做爲流數據的來源,默認狀況下的格式是TextInputFormat。固然你也能夠經過readFile(FileInputFormat inputFormat, String path)來指定FileInputFormat的格式。

Flink一樣支持讀取文件流:

  1.  
    readFileStream(String filePath, long intervalMillis,
  2.  
    FileMonitoringFunction.WatchType watchType)
  3.  
     
  4.  
    readFile(fileInputFormat, path, watchType, interval, pathFilter,
  5.  
    typeInfo)。
  • 1
  • 2
  • 3
  • 4
  • 5

關於基於文件的數據流,這裏再也不過多介紹。

2.2.3 Transformation

Transformation容許將數據從一種形式轉換爲另外一種形式,輸入能夠是1個源也能夠是多個,輸出則能夠是0個、1個或者多個。下面咱們一一介紹這些Transformations。

2.2.3.1 Map

輸入1個元素,輸出一個元素,Java API以下:

  1.  
    inputStream .map(new MapFunction<Integer, Integer>() {
  2.  
    @Override
  3.  
    public Integer map(Integer value) throws Exception {
  4.  
    return 5 * value;
  5.  
    }
  6.  
    });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2.2.3.2 FlatMap

輸入1個元素,輸出0個、1個或多個元素,Java API以下:

  1.  
    inputStream.flatMap( new FlatMapFunction<String, String>() {
  2.  
    @ Override
  3.  
    public void flatMap(String value, Collector<String> out)
  4.  
    throws Exception {
  5.  
    for(String word: value.split(" ")){
  6.  
    out.collect(word);
  7.  
    }
  8.  
    }
  9.  
    });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2.2.3.3 Filter

條件過濾時使用,當結果爲true時,輸出記錄;

  1.  
    inputStream.filter( new FilterFunction<Integer>() {
  2.  
    @Override
  3.  
    public boolean filter(Integer value) throws Exception {
  4.  
    return value != 1;
  5.  
    }
  6.  
    });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2.2.3.4 keyBy

邏輯上按照key分組,內部使用hash函數進行分組,返回keyedDataStream:

inputStream.keyBy("someKey");
  • 1

2.2.3.5 Reduce

keyedStream流上,將上一次reduce的結果和本次的進行操做,例如sum reduce的例子:

  1.  
    keyedInputStream. reduce( new ReduceFunction<Integer>() {
  2.  
    @Override
  3.  
    public Integer reduce(Integer value1, Integer value2)
  4.  
    throws Exception {
  5.  
    return value1 + value2;
  6.  
    }
  7.  
    });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2.2.3.6 Fold

在keyedStream流上的記錄進行鏈接操做,例如:

  1.  
    keyedInputStream keyedStream .fold("Start", new FoldFunction<Integer,
  2.  
    String>() {
  3.  
    @ Override
  4.  
    public String fold(String current, Integer value) {
  5.  
    return current + "=" + value;
  6.  
    }
  7.  
    });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

假如是一個(1,2,3,4,5)的流,那麼結果將是:Start=1=2=3=4=5

2.2.3.7 Aggregation

在keyedStream上應用相似min、max等聚合操做:

  1.  
    keyedInputStream.sum(0)
  2.  
    keyedInputStream.sum("key")
  3.  
    keyedInputStream.min(0)
  4.  
    keyedInputStream.min("key")
  5.  
    keyedInputStream.max(0)
  6.  
    keyedInputStream.max("key")
  7.  
    keyedInputStream.minBy(0)
  8.  
    keyedInputStream.minBy("key")
  9.  
    keyedInputStream.maxBy(0)
  10.  
    keyedInputStream.maxBy("key")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2.2.3.8 Window

窗口功能容許在keyedStream上應用時間或者其餘條件(count或session),根據key分組作聚合操做。

流是無界的,爲了處理無界的流,咱們能夠將流切分到有界的窗口中去處理,根據指定的key,切分爲不一樣的窗口。咱們可使用Flink預約義的窗口分配器。固然你也能夠經過繼承WindowAssginer自定義分配器。

下面看看有哪些預約義的分配器。

2.2.3.8.1 Global windows

Global window的範圍是無限的,你須要指定觸發器來觸發窗口。一般來說,每一個數據按照指定的key分配到不一樣的窗口中,若是不指定觸發器,則窗口永遠不會觸發。

2.2.3.8.2 Tumbling Windows

Tumbling窗口是基於特定時間建立的,他們的大小固定,窗口間不會發生重合。例如你想基於event timen每隔10分鐘計算一次,這個窗口就很適合。

2.2.3.8.3 Sliding Windows

Sliding窗口的大小也是固定的,但窗口之間會發生重合,例如你想基於event time每隔1分鐘,統一過去10分鐘的數據時,這個窗口就很適合。

2.2.3.8.4 Session Windows

Session窗口容許咱們設置一個gap時間,來決定在關閉一個session以前,咱們要等待多長時間,是衡量用戶活躍與否的標誌。

2.2.3.9 WindowAll

WindowAll操做不是基於key的,是對全局數據進行的計算。因爲不基於key,所以是非並行的,即並行度是1.在使用時性能會受些影響。

inputStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)));
  • 1

2.2.3.10 Union

Union功能就是在2個或多個DataStream上進行鏈接,成爲一個新的DataStream。

inputStream. union(inputStream1, inputStream2, ...)
  • 1

2.2.3.11 Join

Join容許在2個DataStream上基於相同的key進行鏈接操做,計算的範圍也是要基於一個window進行。

  1.  
    inputStream. join(inputStream1)
  2.  
    .where( 0).equalTo(1)
  3.  
    .window(TumblingEventTimeWindows.of( Time.seconds(5)))
  4.  
    .apply ( new JoinFunction () {...})
  • 1
  • 2
  • 3
  • 4

2.2.3.12 Split

Split的功能是根據某些條件將一個流切分爲2個或多個流。例如你有一個混合數據的流,根據數據自身的某些特徵,將其劃分到多個不一樣的流單獨處理。

  1.  
    SplitStream <Integer> split = inputStream.split(new
  2.  
    OutputSelector <Integer>() {
  3.  
    @Override
  4.  
    public Iterable<String> select(Integer value) {
  5.  
    List<String> output = new ArrayList<String>();
  6.  
    if (value % 2 == 0) {
  7.  
    output .add("even");
  8.  
    } else {
  9.  
    output .add("odd");}
  10.  
    return output;
  11.  
    }
  12.  
    })
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2.2.3.13 select

DataStream根據選擇的字段,將流轉換爲新的流。

  1.  
    SplitStream< Integer> split;
  2.  
    DataStream< Integer> even = split.select("even");
  3.  
    DataStream< Integer> odd = split.select("odd");
  4.  
    DataStream< Integer> all = split.select("even","odd");
  • 1
  • 2
  • 3
  • 4

2.2.3.14 project

Project功能容許你選擇流中的一部分元素做爲新的數據流中的字段,至關於作個映射。

  1.  
    DataStream<Tuple4<Integer, Double, String, String>> in = // [...]
  2.  
    DataStream<Tuple2< String, String>> out = in.project(3,2);
  • 1
  • 2

2.2.4 物理分片

Flink容許咱們在流上執行物理分片,固然你能夠選擇自定義partitioning。

2.2.4.1 自定義partitioning

根據某個具體的key,將DataStream中的元素按照key從新進行分片,將相同key的元素聚合到一個線程中執行。

  1.  
    inputStream.partitionCustom(partitioner, "someKey");
  2.  
    inputStream.partitionCustom(partitioner, 0);
  • 1
  • 2

2.2.4.2 隨機partitioning

不根據具體的key,而是隨機將數據打散。

inputStream.shuffle();
  • 1

2.2.4.3 Rebalancing partitioning

內部使用round robin方法將數據均勻打散。這對於數據傾斜時是很好的選擇。

inputStream.rebalance();
  • 1

2.2.4.4 Rescaling

Rescaling是經過執行oepration算子來實現的。因爲這種方式僅發生在一個單一的節點,所以沒有跨網絡的數據傳輸。

inputStream.rescale();
  • 1

2.2.4.5 廣播

廣播用於將dataStream全部數據發到每個partition。

inputStream.broadcast();
  • 1

2.2.5 數據Sink

咱們最終須要將結果保存在某個地方,Flink提供了一些選項:

  1.  
    1)writeAsText():將結果以字符串的形式一行一行寫到文本文件中。
  2.  
     
  3.  
    2)writeAsCsV():保存爲csv格式。
  4.  
     
  5.  
    3)print()/printErr():標準輸出或錯誤輸出。輸出到Terminal或者out文件。
  6.  
     
  7.  
    4)writeUsingOutputFormat():自定義輸出格式,要考慮序列化與反序列化。
  8.  
     
  9.  
    5)writeUsingOutputFormat():也能夠輸出到socket,可是你須要定義SerializationSchema。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

對於Flink中的connector以及自定義輸出,後續的章節會講到。

2.2.6 Event Time和watermark

Flink Streaming API受到了Google DataFlow模型的啓發,支持3種不一樣類型的時間概念:

  1.  
    1) Event Time
  2.  
    2) Processing Time
  3.  
    3) Ingestion Time
  • 1
  • 2
  • 3

(1)Event Time 
事件發生的時間,通常數據中自帶時間戳。這就可能致使亂序的發生。

(2)Processing Time 
Processing Time是機器的時間,這種時間跟數據自己沒有關係,徹底依賴於機器的時間。

(3)Ingestion Time 
是數據進入到Flink的時間。注入時間比processing time更加昂貴(多了一個assign timestamp的步驟),可是其準確性相比processing time的處理更好。因爲是進入Flink才分配時間戳,所以沒法處理亂序。

咱們經過在env中設置時間屬性來選擇不一樣的時間概念:

  1.  
    final StreamExecutionEnvironment env =
  2.  
    StreamExecutionEnvironment .getExecutionEnvironment();
  3.  
    env .setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  4.  
    //or
  5.  
    env .setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  6.  
    //or
  7.  
    env .setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

Flink提供了預約義的時間戳抽取器和水位線生成器。參考:

Pre-defined Timestamp Extractors / Watermark Emitters

2.2.7 connectors鏈接器

2.2.7.1 Kafka connector

kafka是一個基於發佈、訂閱的分佈式消息系統。Flink定義了kafka consumer做爲數據源。咱們只須要引入特定的依賴便可(這裏以kafka 0.9爲例):

  1.  
    <dependency>
  2.  
    <groupId>org.apache.flink</groupId>
  3.  
    <artifactId>flink-connector-kafka-0.9_2.11/artifactId>
  4.  
    <version>1.1.4</version>
  5.  
    </dependency
  • 1
  • 2
  • 3
  • 4
  • 5

在使用時,咱們須要指定topic name以及反序列化器:

  1.  
    Properties properties = new Properties();
  2.  
    properties.setProperty( "bootstrap.servers", "localhost:9092");
  3.  
    properties.setProperty( "group.id", "test");
  4.  
    DataStream< String> input = env.addSource(new
  5.  
    FlinkKafkaConsumer09< String>("mytopic", new SimpleStringSchema(),
  6.  
    properties));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Flink默認支持String和Json的反序列化。

Kafka consumer在實現時實現了檢查點功能,所以失敗恢復時能夠重發。

Kafka除了consumer外,咱們也能夠將結果輸出到kafka。即kafka producer。例如:

  1.  
    stream.addSink( new FlinkKafkaProducer09[String]("localhost:9092",
  2.  
    "mytopic", new SimpleStringSchema()))
  • 1
  • 2

2.2.7.2 Twitter connector

用twitter做爲數據源,首先你須要用於twitter帳號。以後你須要建立twitter應用並認證。

這裏有個幫助文檔:https://dev.twitter.com/oauth/overview/application-owner-access-tokens

Pom中添加依賴:

  1.  
    <dependency>
  2.  
    <groupId>org.apache.flink</groupId>
  3.  
    <artifactId>flink-connector-twitter_2.11/artifactId>
  4.  
    <version>1.1.4</version>
  5.  
    </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

API:

  1.  
    Properties props = new Properties();
  2.  
    props .setProperty(TwitterSource.CONSUMER_KEY, "");
  3.  
    props .setProperty(TwitterSource.CONSUMER_SECRET, "");
  4.  
    props .setProperty(TwitterSource.TOKEN, "");
  5.  
    props .setProperty(TwitterSource.TOKEN_SECRET, "");
  6.  
    DataStream< String> streamSource = env.addSource(new
  7.  
    TwitterSource(props)) ;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2.2.7.3 RabbitMQ connector

2.2.7.4 ElasticSearch connector

2.2.7.5 Cassandra connector

這3個connetor略過,殼參考官方文檔:

https://flink.apache.org/ecosystem.html

2.2.8 例子

這裏能夠參考OSCON的例子:

https://github.com/dataArtisans/oscon

2.2.9 總結

本章介紹了Flink的DataStream API,下一章將介紹DataSet API。

相關文章
相關標籤/搜索