1、初識Flinkjava
官網:https://flink.apache.org/web
Apache Flink是一款分佈式、高性能、高可用、高精確的爲數據流應用而生的開源流式處理框架。在 2014 被 Apache 孵化器所接受,而後迅速地成爲了 ASF(Apache Software Foundation)的頂級項目之一。數據庫
Flink核心是用Java和Scala編寫的一個流式的數據流執行引擎,其針對數據流的分佈式計算提供了數據分佈、數據通訊以及容錯機制等功能。apache
可對無限數據流(實時流)和有限數據流(批處理)和進行有狀態計算。可部署在各類集羣環境,對各類大小的數據規模進行快速計算。api
Flink原生支持了迭代計算、內存管理和程序優化。服務器
2、Flink基本架構介紹架構
具體組件:app
上圖大體能夠分爲三塊內容:左邊爲數據輸入、右邊爲數據輸出、中間爲Flink數據處理。框架
Flink支持消息隊列的Events(支持實時的事件)的輸入,上游源源不斷產生數據放入消息隊列,Flink不斷消費、處理消息隊列中的數據,處理完成以後數據寫入下游系統,這個過程是不斷持續的進行。運維
數據源:
1.Transactions:即交易數據。好比各類電商平臺用戶下單,這個數據源源不斷寫入消息隊列
2.Logs:好比web應用運行過程當中產生的錯誤日誌信息,源源不斷髮送到消息隊列中,後續Flink處理爲運維部門提供監控依據。
3.IOT:即物聯網,英文全稱爲Internet of things。物聯網的終端設備,好比華爲手環、小米手環,源源不斷的產生數據寫入消息隊列,後續Flink處理提供健康報告
4.Clicks:即點擊流,好比打開淘寶網站,淘寶網站頁面上埋有不少數據採集點或者探針,當用戶點擊淘寶頁面的時候,它會採集用戶點擊行爲的詳細信息,這些用戶的點擊行爲產生的數據流咱們稱爲點擊流。
數據輸入系統:
Flink既支持實時(Real-time)流處理,又支持批處理。實時流消息系統,好比Kafka。批處理系統有不少,DataBase(好比傳統MySQL、Oracle數據庫),KV-Store(好比HBase、MongoDB數據庫),File System(好比本地文件系統、分佈式文件系統HDFS)。
Flink數據處理:
Flink在數據處理過程當中,資源管理調度可使用K8s(Kubernetes 簡稱K8s,是Google開源的一個容器編排引擎)、YARN、Mesos,中間數據存儲可使用HDFS、S三、NFS等
數據輸出:
Flink能夠將處理後的數據輸出下游的應用(Application),也能夠將處理事後的數據寫入消息隊列(好比Kafka),還能夠將處理後的輸入寫入Database、File System和KV-Store。
3、Flink核心組件棧
從上圖能夠看出Flink的底層是Deploy,Flink能夠Local模式運行,啓動單個 JVM。Flink也能夠Standalone 集羣模式運行,同時也支持Flink ON YARN,Flink應用直接提交到YARN上面運行。另外Flink還能夠運行在GCE(谷歌雲服務)和EC2(亞馬遜雲服務)。
Deploy的上層是Flink的核心(Core)部分Runtime。在Runtime之上提供了兩套核心的API,DataStream API(流處理)和DataSet API(批處理)。在覈心API之上又擴展了一些高階的庫和API,好比CEP流處理,Table API和SQL,Flink ML機器學習庫,Gelly圖計算。SQL既能夠跑在DataStream API,又能夠跑在DataSet API。
4、Flink的前世此生
Flink在發展過程的關鍵時刻:
誕生於2009年,原來叫StratoSphere,是柏林工業大學的一個研究性項目,早期專一於批計算。
2014年孵化出Flink項目並捐給了Apache。
2015年開始引發你們注意,出如今大數據舞臺。
2016年在阿里獲得大規模應用。
5、爲何是Flink?
大數據生態圈很龐大,優秀的框架和組件,爲什麼Flink如此受寵?
1. 從技術角度來講,目前大數據計算引擎中, 可以同時支持流處理和批處理的計算引擎,只有Spark和Flink(Storm只支持流處理)。其中Spark的技術理念是基於微批處理來模擬流的計算。而Flink則徹底相反,它採用的是基於流計算來模擬批計算。從技術發展方向看,用批來模擬流有必定的技術侷限性,而且這個侷限性可能很難突破。而Flink基於流來模擬批,在技術上有更好的擴展性。
2. 從語言方面來講,提供友好且優雅流暢的java和scala api和支持,java用戶衆多也是一個重要緣由。
3. 大公司的風向標做用, 阿里全面轉向Flink無疑是一個催化劑。目前,阿里巴巴全部的業務,包括阿里巴巴全部子公司都採用了基於Flink搭建的實時計算平臺。
阿里巴巴計算平臺事業部資深技術專家莫問在雲棲大會的演講內容 —— 阿里巴巴爲何選擇Apache Flink?這個框架的性能表現確實很優秀, Flink最初上線阿里巴巴只有數百臺服務器,目前規模已達上萬臺,此等規模在全球範圍內也是屈指可數;基於Flink,阿里內部積累起來的狀態數據已是PB級別規模;現在天天在阿里Flink的計算平臺上,處理的數據已經超過萬億條;在峯值期間能夠承擔每秒超過4.72億次的訪問,最典型的應用場景是阿里巴巴雙11大屏。
其實不光阿里,國內不少一線的公司都投入不少人力和財力在Flink實時計算上。
6、流式計算的表明:Flink、Spark Streaming、Storm對比
對比分析與建議:
若是對延遲要求不高的狀況下,建議使用Spark Streaming,豐富的高級,使用簡單,自然對接Spark生態棧中的其餘組件,吞吐量大,部署簡單,UI界面也作的更加智能,社區活躍度較高,有問題響應速度也是比較快的,比較適合作流式的ETL,並且Spark的發展勢頭也是有目共睹的,相信將來性能和功能將會更加完善。
若是對延遲性要求比較高的話,建議能夠嘗試下Flink,Flink是目前發展比較火的一個流系統,採用原生的流處理系統,保證了低延遲性,在和容錯上也是作的比較完善,使用起來相對來講也是比較簡單的,部署容易,並且發展勢頭也愈來愈好,相信後面社區問題的響應速度應該也是比較快的。
7、案例演示(java&scala)
一、maven依賴導入 <dependencies> <!--java--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.8.0</version> </dependency> <!--scala--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.7</version> </dependency> </dependencies> java代碼: package com.fwmagic.flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; import java.util.Date; /** * 使用flink對指定窗口內的數據進行實時統計,最終把結果打印出來 * 先在機器上執行nc -lk 9000 */ public class StreamingWindowWordCountJava { public static void main(String[] args) throws Exception { //定義socket的端口號,默認9999 final int port; try { final ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port", 9999); } catch (Exception e) { System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'"); return; } //獲取運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //鏈接socket獲取輸入的數據 DataStreamSource<String> text = env.socketTextStream("localhost", port, "\n"); //計算數據 //拍平操做,把每行的單詞轉爲<word,count>類型的數據 DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() { public void flatMap(String value, Collector<WordWithCount> out) { String[] splits = value.split("\\s"); for (String word : splits) { out.collect(new WordWithCount(word, 1L)); } } //針對相同的word數據進行分組 }).keyBy("word") //指定計算數據的窗口大小和滑動窗口大小,每1秒計算一次最近5秒的結果 .timeWindow(Time.seconds(5),Time.seconds(1)) .sum("count"); windowCount.print().setParallelism(1); //把數據打印到控制檯,使用一個並行度 // windowCount.print().setParallelism(1); //注意:由於flink是懶加載的,因此必須調用execute方法,上面的代碼纔會執行 env.execute("streaming word count"); } /** * 存儲單詞以及單詞出現的次數 */ public static class WordWithCount { public String word; public long count; public WordWithCount() { } public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); return date + ":{" + "word='" + word + '\'' + ", count=" + count + '}'; } } } scala代碼: package com.fwmagic.flink import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.scala._ object StreamingWWC { def main(args: Array[String]): Unit = { val parameterTool: ParameterTool = ParameterTool.fromArgs(args) val port: Int = parameterTool.getInt("port",9999) val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val text: DataStream[String] = env.socketTextStream("localhost",port) val wc: DataStream[WordCount] = text.flatMap(t => t.split(",")) .map(w => WordCount(w, 1)) .keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .reduce((a,b) => WordCount(a.word,a.count+b.count)) //.sum("count") wc.print().setParallelism(1) env.execute("word count streaming !") } } case class WordCount(word:String,count:Long)
8、Flink部署
以local部署模式爲例,後續會介紹在yarn上部署:
下載:http://apache.website-solution.net/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.11.tgz
解壓:tar -zxvf flink-1.8.0-bin-scala_2.11.tgz
啓動:bin/start-cluster.sh
打包、提交任務:
mvn clean package
一、經過頁面的Submit new Job來提交任務
二、經過命令行提交:bin/flink run -c com.fwmagic.flink.StreamingWindowWordCountJava examples/myjar/fwmagic-flink.jar --port 6666
注意:提交任務前先開啓端口:nc -lk 6666
測試:
發送消息,在頁面中查看日誌:TaskManagers->點擊任務->Stdout
或者在命令行查看日誌:tail -f log/flink-*-taskexecutor-*.out
中止任務
1:web ui界面中止
2:命令行執行bin/flink cancel <job-id>