這幾年大數據的飛速發展,出現了不少熱門的開源社區,其中著名的有 Hadoop、Storm,以及後來的 Spark,他們都有着各自專一的應用場景。Spark 掀開了內存計算的先河,也之內存爲賭注,贏得了內存計算的飛速發展。Spark 的火熱或多或少的掩蓋了其餘分佈式計算的系統身影。就像 Flink,也就在這個時候默默的發展着。html
在國外一些社區,有不少人將大數據的計算引擎分紅了 4 代,固然,也有不少人不會認同。咱們先姑且這麼認爲和討論。web
首先第一代的計算引擎,無疑就是 Hadoop 承載的 MapReduce。這裏你們應該都不會對 MapReduce 陌生,它將計算分爲兩個階段,分別爲 Map 和 Reduce。對於上層應用來講,就不得不千方百計去拆分算法,甚至於不得不在上層應用實現多個 Job 的串聯,以完成一個完整的算法,例如迭代計算。算法
因爲這樣的弊端,催生了支持 DAG 框架的產生。所以,支持 DAG 的框架被劃分爲第二代計算引擎。如 Tez 以及更上層的 Oozie。這裏咱們不去細究各類 DAG 實現之間的區別,不過對於當時的 Tez 和 Oozie 來講,大多仍是批處理的任務。apache
接下來就是以 Spark 爲表明的第三代的計算引擎。第三代計算引擎的特色主要是 Job 內部的 DAG 支持(不跨越 Job),以及強調的實時計算。在這裏,不少人也會認爲第三代計算引擎也可以很好的運行批處理的 Job。瀏覽器
隨着第三代計算引擎的出現,促進了上層應用快速發展,例如各類迭代計算的性能以及對流計算和 SQL 等的支持。Flink 的誕生就被歸在了第四代。這應該主要表如今 Flink 對流計算的支持,以及更一步的實時性上面。固然 Flink 也能夠支持 Batch 的任務,以及 DAG 的運算。session
或許會有人不一樣意以上的分類,我以爲其實這並不重要的,重要的是體會各個框架的差別,以及更適合的場景。並進行理解,沒有哪個框架能夠完美的支持全部的場景,也就不可能有任何一個框架能徹底取代另外一個,就像 Spark 沒有徹底取代 Hadoop,固然 Flink 也不可能取代 Spark。本文將致力描述 Flink 的原理以及應用。架構
不少人可能都是在 2015 年才聽到 Flink 這個詞,其實早在 2008 年,Flink 的前身已是柏林理工大學一個研究性項目, 在 2014 被 Apache 孵化器所接受,而後迅速地成爲了 ASF(Apache Software Foundation)的頂級項目之一。Flink 的最新版本目前已經更新到了 0.10.0 了,在不少人感慨 Spark 的快速發展的同時,或許咱們也該爲 Flink 的發展速度點個贊。app
Flink 是一個針對流數據和批數據的分佈式處理引擎。它主要是由 Java 代碼實現。目前主要仍是依靠開源社區的貢獻而發展。對 Flink 而言,其所要處理的主要場景就是流數據,批數據只是流數據的一個極限特例而已。再換句話說,Flink 會把全部任務當成流來處理,這也是其最大的特色。Flink 能夠支持本地的快速迭代,以及一些環形的迭代任務。而且 Flink 能夠定製化內存管理。在這點,若是要對比 Flink 和 Spark 的話,Flink 並無將內存徹底交給應用層。這也是爲何 Spark 相對於 Flink,更容易出現 OOM 的緣由(out of memory)。就框架自己與應用場景來講,Flink 更類似與 Storm。若是以前瞭解過 Storm 或者 Flume 的讀者,可能會更容易理解 Flink 的架構和不少概念。下面讓咱們先來看下 Flink 的架構圖。框架
如圖 1 所示,咱們能夠了解到 Flink 幾個最基礎的概念,Client、JobManager 和 TaskManager。Client 用來提交任務給 JobManager,JobManager 分發任務給 TaskManager 去執行,而後 TaskManager 會心跳的彙報任務狀態。看到這裏,有的人應該已經有種回到 Hadoop 一代的錯覺。確實,從架構圖去看,JobManager 很像當年的 JobTracker,TaskManager 也很像當年的 TaskTracker。然而有一個最重要的區別就是 TaskManager 之間是是流(Stream)。其次,Hadoop 一代中,只有 Map 和 Reduce 之間的 Shuffle,而對 Flink 而言,多是不少級,而且在 TaskManager 內部和 TaskManager 之間都會有數據傳遞,而不像 Hadoop,是固定的 Map 到 Reduce。curl
在 Flink 集羣中,計算資源被定義爲 Task Slot。每一個 TaskManager 會擁有一個或多個 Slots。JobManager 會以 Slot 爲單位調度 Task。可是這裏的 Task 跟咱們在 Hadoop 中的理解是有區別的。對 Flink 的 JobManager 來講,其調度的是一個 Pipeline 的 Task,而不是一個點。舉個例子,在 Hadoop 中 Map 和 Reduce 是兩個獨立調度的 Task,而且都會去佔用計算資源。對 Flink 來講 MapReduce 是一個 Pipeline 的 Task,只佔用一個計算資源。類同的,若是有一個 MRR 的 Pipeline Task,在 Flink 中其也是一個被總體調度的 Pipeline Task。在 TaskManager 中,根據其所擁有的 Slot 個數,同時會擁有多個 Pipeline。
在 Flink StandAlone 的部署模式中,這個還比較容易理解。由於 Flink 自身也須要簡單的管理計算資源(Slot)。當 Flink 部署在 Yarn 上面以後,Flink 並無弱化資源管理。也就是說這時候的 Flink 在作一些 Yarn 該作的事情。從設計角度來說,我認爲這是不太合理的。若是 Yarn 的 Container 沒法徹底隔離 CPU 資源,這時候對 Flink 的 TaskManager 配置多個 Slot,應該會出現資源不公平利用的現象。Flink 若是想在數據中心更好的與其餘計算框架共享計算資源,應該儘可能不要干預計算資源的分配和定義。
須要深度學習 Flink 調度讀者,能夠在 Flink 的源碼目錄中找到 flink-runtime 這個文件夾,JobManager 的 code 基本都在這裏。
一個計算框架要有長遠的發展,必須打造一個完整的 Stack。否則就跟紙上談兵同樣,沒有任何意義。只有上層有了具體的應用,並能很好的發揮計算框架自己的優點,那麼這個計算框架才能吸引更多的資源,纔會更快的進步。因此 Flink 也在努力構建本身的 Stack。
Flink 首先支持了 Scala 和 Java 的 API,Python 也正在測試中。Flink 經過 Gelly 支持了圖操做,還有機器學習的 FlinkML。Table 是一種接口化的 SQL 支持,也就是 API 支持,而不是文本化的 SQL 解析和執行。對於完整的 Stack 咱們能夠參考下圖。
Flink 爲了更普遍的支持大數據的生態圈,其下也實現了不少 Connector 的子項目。最熟悉的,固然就是與 Hadoop HDFS 集成。其次,Flink 也宣佈支持了 Tachyon、S3 以及 MapRFS。不過對於 Tachyon 以及 S3 的支持,都是經過 Hadoop HDFS 這層包裝實現的,也就是說要使用 Tachyon 和 S3,就必須有 Hadoop,並且要更改 Hadoop 的配置(core-site.xml)。若是瀏覽 Flink 的代碼目錄,咱們就會看到更多 Connector 項目,例如 Flume 和 Kafka。
Flink 有三種部署模式,分別是 Local、Standalone Cluster 和 Yarn Cluster。對於 Local 模式來講,JobManager 和 TaskManager 會公用一個 JVM 來完成 Workload。若是要驗證一個簡單的應用,Local 模式是最方便的。實際應用中大多使用 Standalone 或者 Yarn Cluster。下面我主要介紹下這兩種模式。
在搭建 Standalone 模式的 Flink 集羣以前,咱們須要先下載 Flink 安裝包。這裏咱們須要下載 Flink 針對 Hadoop 1.x 的包。下載並解壓後,進到 Flink 的根目錄,而後查看 conf 文件夾,以下圖。
咱們須要指定 Master 和 Worker。Master 機器會啓動 JobManager,Worker 則會啓動 TaskManager。所以,咱們須要修改 conf 目錄中的 master 和 slaves。在配置 master 文件時,須要指定 JobManager 的 UI 監聽端口。通常狀況下,JobManager 只需配置一個,Worker 則須配置一個或多個(以行爲單位)。示例以下:
1
2
3
4
5
|
micledeMacBook-Pro:conf micle$ cat masters
localhost:8081
micledeMacBook-Pro:conf micle$ cat slaves
localhost
|
在 conf 目錄中找到文件 flink-conf.yaml。在這個文件中定義了 Flink 各個模塊的基本屬性,如 RPC 的端口,JobManager 和 TaskManager 堆的大小等。在不考慮 HA 的狀況下,通常只須要修改屬性 taskmanager.numberOfTaskSlots,也就是每一個 Task Manager 所擁有的 Slot 個數。這個屬性,通常設置成機器 CPU 的 core 數,用來平衡機器之間的運算性能。其默認值爲 1。配置完成後,使用下圖中的命令啓動 JobManager 和 TaskManager(啓動以前,須要確認 Java 的環境是否已經就緒)。
啓動以後咱們就能夠登錄 Flink 的 GUI 頁面。在頁面中咱們能夠看到 Flink 集羣的基本屬性,在 JobManager 和 TaskManager 的頁面中,能夠看到這兩個模塊的屬性。目前 Flink 的 GUI,只提供了簡單的查看功能,沒法動態修改配置屬性。通常在企業級應用中,這是很難被接受的。所以,一個企業真正要應用 Flink 的話,估計也不得不增強 WEB 的功能。
在一個企業中,爲了最大化的利用集羣資源,通常都會在一個集羣中同時運行多種類型的 Workload。所以 Flink 也支持在 Yarn 上面運行。首先,讓咱們經過下圖瞭解下 Yarn 和 Flink 的關係。
在圖中能夠看出,Flink 與 Yarn 的關係與 MapReduce 和 Yarn 的關係是同樣的。Flink 經過 Yarn 的接口實現了本身的 App Master。當在 Yarn 中部署了 Flink,Yarn 就會用本身的 Container 來啓動 Flink 的 JobManager(也就是 App Master)和 TaskManager。
瞭解了 Flink 與 Yarn 的關係,咱們就簡單看下部署的步驟。在這以前須要先部署好 Yarn 的集羣,這裏我就不作介紹了。咱們能夠經過如下的命令查看 Yarn 中現有的 Application,而且來檢查 Yarn 的狀態。
1
|
yarn application –list
|
若是命令正確返回了,就說明 Yarn 的 RM 目前已經在啓動狀態。針對不一樣的 Yarn 版本,Flink 有不一樣的安裝包。咱們能夠在 Apache Flink 的下載頁中找到對應的安裝包。個人 Yarn 版本爲 2.7.1。再介紹具體的步驟以前,咱們須要先了解 Flink 有兩種在 Yarn 上面的運行模式。一種是讓 Yarn 直接啓動 JobManager 和 TaskManager,另外一種是在運行 Flink Workload 的時候啓動 Flink 的模塊。前者至關於讓 Flink 的模塊處於 Standby 的狀態。這裏,我也主要介紹下前者。
在下載和解壓 Flink 的安裝包以後,須要在環境中增長環境變量 HADOOP_CONF_DIR 或者 YARN_CONF_DIR,其指向 Yarn 的配置目錄。如運行下面的命令:
1
|
export HADOOP_CONF_DIR=/etc/hadoop/conf
|
這是由於 Flink 實現了 Yarn 的 Client,所以須要 Yarn 的一些配置和 Jar 包。在配置好環境變量後,只需簡單的運行以下的腳本,Yarn 就會啓動 Flink 的 JobManager 和 TaskManager。
1
|
yarn-session.sh –d –s 2 –tm 800 –n 2
|
上面的命令的意思是,向 Yarn 申請 2 個 Container 啓動 TaskManager(-n 2),每一個 TaskManager 擁有兩個 Task Slot(-s 2),而且向每一個 TaskManager 的 Container 申請 800M 的內存。在上面的命令成功後,咱們就能夠在 Yarn Application 頁面看到 Flink 的紀錄。以下圖。
若是有些讀者在虛擬機中測試,可能會遇到錯誤。這裏須要注意內存的大小,Flink 向 Yarn 會申請多個 Container,可是 Yarn 的配置可能限制了 Container 所能申請的內存大小,甚至 Yarn 自己所管理的內存就很小。這樣極可能沒法正常啓動 TaskManager,尤爲當指定多個 TaskManager 的時候。所以,在啓動 Flink 以後,須要去 Flink 的頁面中檢查下 Flink 的狀態。這裏能夠從 RM 的頁面中,直接跳轉(點擊 Tracking UI)。這時候 Flink 的頁面如圖 8。
對於 Flink 安裝時的 Trouble-shooting,可能更多時候須要查看 Yarn 相關的 log 來分析。這裏就很少作介紹,讀者能夠到 Yarn 相關的描述中查找。
對於一個企業級的應用,穩定性是首要要考慮的問題,而後纔是性能,所以 HA 機制是必不可少的。另外,對於已經瞭解 Flink 架構的讀者,可能會更擔憂 Flink 架構背後的單點問題。和 Hadoop 一代同樣,從架構中咱們能夠很明顯的發現 JobManager 有明顯的單點問題(SPOF,single point of failure)。 JobManager 肩負着任務調度以及資源分配,一旦 JobManager 出現意外,其後果可想而知。Flink 對 JobManager HA 的處理方式,原理上基本和 Hadoop 同樣(一代和二代)。
首先,咱們須要知道 Flink 有兩種部署的模式,分別是 Standalone 以及 Yarn Cluster 模式。對於 Standalone 來講,Flink 必須依賴於 Zookeeper 來實現 JobManager 的 HA(Zookeeper 已經成爲了大部分開源框架 HA 必不可少的模塊)。在 Zookeeper 的幫助下,一個 Standalone 的 Flink 集羣會同時有多個活着的 JobManager,其中只有一個處於工做狀態,其餘處於 Standby 狀態。當工做中的 JobManager 失去鏈接後(如宕機或 Crash),Zookeeper 會從 Standby 中選舉新的 JobManager 來接管 Flink 集羣。
對於 Yarn Cluaster 模式來講,Flink 就要依靠 Yarn 自己來對 JobManager 作 HA 了。其實這裏徹底是 Yarn 的機制。對於 Yarn Cluster 模式來講,JobManager 和 TaskManager 都是被 Yarn 啓動在 Yarn 的 Container 中。此時的 JobManager,其實應該稱之爲 Flink Application Master。也就說它的故障恢復,就徹底依靠着 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 同樣)。因爲徹底依賴了 Yarn,所以不一樣版本的 Yarn 可能會有細微的差別。這裏再也不作深究。
Flink 和其餘大多開源的框架同樣,提供了不少有用的 Rest API。不過 Flink 的 RestAPI,目前還不是很強大,只能支持一些 Monitor 的功能。Flink Dashboard 自己也是經過其 Rest 來查詢各項的結果數據。在 Flink RestAPI 基礎上,能夠比較容易的將 Flink 的 Monitor 功能和其餘第三方工具相集成,這也是其設計的初衷。
在 Flink 的進程中,是由 JobManager 來提供 Rest API 的服務。所以在調用 Rest 以前,要肯定 JobManager 是否處於正常的狀態。正常狀況下,在發送一個 Rest 請求給 JobManager 以後,Client 就會收到一個 JSON 格式的返回結果。因爲目前 Rest 提供的功能還很少,須要加強這塊功能的讀者能夠在子項目 flink-runtime-web 中找到對應的代碼。其中最關鍵一個類 WebRuntimeMonitor,就是用來對全部的 Rest 請求作分流的,若是須要添加一個新類型的請求,就須要在這裏增長對應的處理代碼。下面我例舉幾個經常使用 Rest API。
1.查詢 Flink 集羣的基本信息: /overview。示例命令行格式以及返回結果以下:
1
2
|
$ curl http://localhost:8081/overview{"taskmanagers":1,"slots-total":16,
"slots-available":16,"jobs-running":0,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0}
|
2.查詢當前 Flink 集羣中的 Job 信息:/jobs。示例命令行格式以及返回結果以下:
1
2
|
$ curl http://localhost:8081/jobs{"jobs-running":[],"jobs-finished":
["f91d4dd4fdf99313d849c9c4d29f8977"],"jobs-cancelled":[],"jobs-failed":[]}
|
3.查詢一個指定的 Job 信息: /jobs/jobid。這個查詢的結果會返回特別多的詳細的內容,這是我在瀏覽器中進行的測試,以下圖:
想要了解更多 Rest 請求內容的讀者,能夠去 Apache Flink 的頁面中查找。因爲篇幅有限,這裏就不一一列舉。
WordCount 的例子,就像是計算框架的 helloworld。這裏我就以 WordCount 爲例,介紹下如何在 Flink 中運行 workload。
在安裝好 Flink 的環境中,找到 Flink 的目錄。而後找到 bin/flink,它就是用來提交 Flink workload 的工具。對於 WordCount,咱們能夠直接使用已有的示例 jar 包。如運行以下的命令:
1
|
./bin/flink run ./examples/WordCount.jar hdfs://user/root/test hdfs://user/root/out
|
上面的命令是在 HDFS 中運行 WordCount,若是沒有 HDFS 用本地的文件系統也是能夠的,只須要將「hdfs://」換成「file://」。這裏咱們須要強調一種部署關係,就是 StandAlone 模式的 Flink,也是能夠直接訪問 HDFS 等分佈式文件系統的。
Flink 是一個比 Spark 起步晚的項目,可是並不表明 Flink 的前途就會暗淡。Flink 和 Spark 有不少相似之處,但也有不少明顯的差別。本文並無比較這二者之間的差別,這是將來我想與你們探討的。例如 Flink 如何更高效的管理內存,如何進一步的避免用戶程序的 OOM。在 Flink 的世界裏一切都是流,它更專一處理流應用。因爲其起步晚,加上社區的活躍度並無 Spark 那麼熱,因此其在一些細節的場景支持上,並無 Spark 那麼完善。例如目前在 SQL 的支持上並無 Spark 那麼平滑。在企業級應用中,Spark 已經開始落地,而 Flink 可能還須要一段時間的打磨。在後續文章中,我會詳細介紹如何開發 Flink 的程序,以及更多有關 Flink 內部實現的內容。
內容來自:https://www.ibm.com/developerworks/cn/opensource/os-cn-apache-flink/