Flink是Apache的一個頂級項目,Apache Flink 是一個開源的分佈式流處理和批處理系統。Flink 的核心是在數據流上提供數據分發、通訊、具有容錯的分佈式計算。同時,Flink 在流處理引擎上構建了批處理引擎,原生支持了迭代計算、內存管理和程序優化。html
現有的開源計算方案,會把流處理和批處理做爲兩種不一樣的應用類型,由於它們所提供的SLA(Service-Level-Aggreement)是徹底不相同的:流處理通常須要支持低延遲、Exactly-once保證,而批處理須要支持高吞吐、高效處理。java
Flink從另外一個視角看待流處理和批處理,將兩者統一塊兒來:Flink是徹底支持流處理,也就是說做爲流處理看待時輸入數據流是×××的;批處理被做爲一種特殊的流處理,只是它的輸入數據流被定義爲有界的。web
Flink流處理特性:apache
Flink架構圖:編程
Flink以層級式系統形式組件其軟件棧,不一樣層的棧創建在其下層基礎上,而且各層接受程序不一樣層的抽象形式。緩存
在最基本的層面上,一個Flink應用程序是由如下幾部分組成:bash
以下圖:session
目前Flink支持以下框架:架構
Flink官網地址以下:app
部份內容參考自以下文章:
Flink下載地址:
Flink快速開始文檔地址:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/setup_quickstart.html
注:安裝Flink以前系統中須要安裝有jdk1.7以上版本的環境
我這裏下載的是2.6版本的Flink:
[root@study-01 ~]# cd /usr/local/src/ [root@study-01 /usr/local/src]# wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.4.2/flink-1.4.2-bin-hadoop26-scala_2.11.tgz [root@study-01 /usr/local/src]# tar -zxvf flink-1.4.2-bin-hadoop26-scala_2.11.tgz -C /usr/local [root@study-01 /usr/local/src]# cd ../flink-1.4.2/ [root@study-01 /usr/local/flink-1.4.2]# ls bin conf examples lib LICENSE log NOTICE opt README.txt resources tools [root@study-01 /usr/local/flink-1.4.2]#
啓動Flink:
[root@study-01 /usr/local/flink-1.4.2]# ./bin/start-local.sh [root@study-01 /usr/local/flink-1.4.2]# jps 6576 Jps 6131 JobManager 6499 TaskManager [root@study-01 /usr/local/flink-1.4.2]#
啓動成功以後就能夠訪問主機ip的8081端口,進入到Flink的web頁面:
咱們如今就能夠開始實現wordcount案例了,我這裏有一個文件,內容以下:
[root@study-01 /usr/local/flink-1.4.2]# cat /data/hello.txt hadoop welcome hadoop hdfs mapreduce hadoop hdfs hello hadoop spark vs mapreduce [root@study-01 /usr/local/flink-1.4.2]#
執行以下命令,實現wordcount案例,若是學習過Hadoop會發現這個命令和Hadoop上使用MapReduce實現wordcount案例是相似的:
[root@study-01 /usr/local/flink-1.4.2]# ./bin/flink run ./examples/batch/WordCount.jar --input file:///data/hello.txt --output file:///data/tmp/flink_wordcount_out
執行完成後,能夠到web頁面上,查看任務的執行信息:
查看輸出結果:
[root@study-01 /usr/local/flink-1.4.2]# cat /data/tmp/flink_wordcount_out hadoop 4 hdfs 2 hello 1 mapreduce 2 spark 1 vs 1 welcome 1 [root@study-01 /usr/local/flink-1.4.2]#
Google的新老三駕馬車:
咱們都知道,Hadoop生態圈內的幾個框架都源於Google老的三駕馬車,而一些新的框架實現也是部分源於Google新的三駕馬車的概念。因此如今市面上的大數據相關框架不少,框架多就會致使編程規範多、處理模式不一致,而咱們但願有一個工具可以統一這些編程模型,所以,Beam就誕生了。
Apache Beam是 Apache 軟件基金會於2017年1 月 10 日對外宣佈的開源平臺。Beam 爲建立複雜數據平行處理管道,提供了一個可移動(兼容性好)的 API 層。這層 API 的核心概念基於 Beam 模型(之前被稱爲 Dataflow 模型),並在每一個 Beam 引擎上不一樣程度得執行。
背景:
2016 年 2 月份,谷歌及其合做夥伴向 Apache 捐贈了一大批代碼,創立了孵化中的 Beam 項目( 最初叫 Apache Dataflow)。這些代碼中的大部分來自於谷歌 Cloud Dataflow SDK——開發者用來寫流處理和批處理管道(pipelines)的庫,可在任何支持的執行引擎上運行。當時,支持的主要引擎是谷歌 Cloud Dataflow,附帶對 Apache Spark 和 開發中的 Apache Flink 支持。現在,它正式開放之時,已經有五個官方支持的引擎。除去已經提到的三個,還包括 Beam 模型和 Apache Apex。
Beam特色:
Beam的官方網站:
Beam Java的快速開始文檔:
安裝Beam的前置也是須要系統具有jdk1.7以上版本的環境,以及Maven環境。
使用以下命令下載Beam以及wordcount案例代碼:
mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=2.4.0 \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
進入下載後的目錄進行查看:
[root@study-01 /usr/local/src]# cd word-count-beam/ [root@study-01 /usr/local/src/word-count-beam]# tree . ├── pom.xml └── src ├── main │ └── java │ └── org │ └── apache │ └── beam │ └── examples │ ├── common │ │ ├── ExampleBigQueryTableOptions.java │ │ ├── ExampleOptions.java │ │ ├── ExamplePubsubTopicAndSubscriptionOptions.java │ │ ├── ExamplePubsubTopicOptions.java │ │ ├── ExampleUtils.java │ │ └── WriteOneFilePerWindow.java │ ├── complete │ │ └── game │ │ ├── GameStats.java │ │ ├── HourlyTeamScore.java │ │ ├── injector │ │ │ ├── Injector.java │ │ │ ├── InjectorUtils.java │ │ │ └── RetryHttpInitializerWrapper.java │ │ ├── LeaderBoard.java │ │ ├── StatefulTeamScore.java │ │ ├── UserScore.java │ │ └── utils │ │ ├── GameConstants.java │ │ ├── WriteToBigQuery.java │ │ ├── WriteToText.java │ │ └── WriteWindowedToBigQuery.java │ ├── DebuggingWordCount.java │ ├── MinimalWordCount.java │ ├── WindowedWordCount.java │ └── WordCount.java └── test └── java └── org └── apache └── beam └── examples ├── complete │ └── game │ ├── GameStatsTest.java │ ├── HourlyTeamScoreTest.java │ ├── LeaderBoardTest.java │ ├── StatefulTeamScoreTest.java │ └── UserScoreTest.java ├── DebuggingWordCountTest.java ├── MinimalWordCountTest.java └── WordCountTest.java 20 directories, 31 files [root@study-01 /usr/local/src/word-count-beam]#
默認狀況下,beam的runner是Direct,下面就用Direct來運行wordcount案例,命令以下:
[root@study-01 /usr/local/src/word-count-beam]# ls pom.xml src target [root@study-01 /usr/local/src/word-count-beam]# [root@study-01 /usr/local/src/word-count-beam]# mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--inputFile=/data/hello.txt --output=counts" -Pdirect-runner
運行的結果會存放在當前的目錄下:
[root@study-01 /usr/local/src/word-count-beam]# ls counts-00000-of-00003 counts-00001-of-00003 counts-00002-of-00003 pom.xml src target [root@study-01 /usr/local/src/word-count-beam]# more counts* # 查看結果文件 :::::::::::::: counts-00000-of-00003 :::::::::::::: welcome: 1 spark: 1 :::::::::::::: counts-00001-of-00003 :::::::::::::: hdfs: 2 hadoop: 4 mapreduce: 2 :::::::::::::: counts-00002-of-00003 :::::::::::::: hello: 1 vs: 1 [root@study-01 /usr/local/src/word-count-beam]#
若是須要指定其餘的runner則可使用--runner參數進行指定,例如我要指定runner爲Flink,則修改命令以下便可:
[root@study-01 /usr/local/src/word-count-beam]# mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--runner=FlinkRunner --inputFile=/data/hello.txt --output=counts" -Pflink-runner
刪除以前生成的文件及目錄,咱們來使用Spark的方式進行運行。使用Spark的話,也只是修改--runner以及-Pspark參數便可:
[root@study-01 /usr/local/src/word-count-beam]# mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount -Dexec.args="--runner=SparkRunner --inputFile=/data/hello.txt --output=counts" -Pspark-runner
運行成功後,也是會生成以下文件及目錄:
[root@study-01 /usr/local/src/word-count-beam]# ls counts-00000-of-00003 counts-00001-of-00003 counts-00002-of-00003 pom.xml src target [root@study-01 /usr/local/src/word-count-beam]#
查看處理結果:
[root@study-01 /usr/local/src/word-count-beam]# more counts* :::::::::::::: counts-00000-of-00003 :::::::::::::: spark: 1 :::::::::::::: counts-00001-of-00003 :::::::::::::: welcome: 1 hello: 1 mapreduce: 2 :::::::::::::: counts-00002-of-00003 :::::::::::::: vs: 1 hdfs: 2 hadoop: 4 [root@study-01 /usr/local/src/word-count-beam]#
以上這兩個示例只是想說明一點,同一份代碼,能夠運行在不一樣的計算引擎上。不須要爲不一樣的引擎開發不一樣的代碼,這就是Beam框架的最主要的設計目的之一。