什麼是Apache Flink

大數據計算引擎的發展

     這幾年大數據的飛速發展,出現了不少熱門的開源社區,其中著名的有 Hadoop、Storm,以及後來的 Spark,他們都有着各自專一的應用場景。Spark 掀開了內存計算的先河,也之內存爲賭注,贏得了內存計算的飛速發展。Spark 的火熱或多或少的掩蓋了其餘分佈式計算的系統身影。就像 Flink,也就在這個時候默默的發展着。html

在國外一些社區,有不少人將大數據的計算引擎分紅了 4 代,固然,也有不少人不會認同。咱們先姑且這麼認爲和討論。web

首先第一代的計算引擎,無疑就是 Hadoop 承載的 MapReduce。這裏你們應該都不會對 MapReduce 陌生,它將計算分爲兩個階段,分別爲 Map 和 Reduce。對於上層應用來講,就不得不千方百計去拆分算法,甚至於不得不在上層應用實現多個 Job 的串聯,以完成一個完整的算法,例如迭代計算。算法

因爲這樣的弊端,催生了支持 DAG 框架的產生。所以,支持 DAG 的框架被劃分爲第二代計算引擎。如 Tez 以及更上層的 Oozie。這裏咱們不去細究各類 DAG 實現之間的區別,不過對於當時的 Tez 和 Oozie 來講,大多仍是批處理的任務。apache

接下來就是以 Spark 爲表明的第三代的計算引擎。第三代計算引擎的特色主要是 Job 內部的 DAG 支持(不跨越 Job),以及強調的實時計算。在這裏,不少人也會認爲第三代計算引擎也可以很好的運行批處理的 Job。瀏覽器

隨着第三代計算引擎的出現,促進了上層應用快速發展,例如各類迭代計算的性能以及對流計算和 SQL 等的支持。Flink 的誕生就被歸在了第四代。這應該主要表如今 Flink 對流計算的支持,以及更一步的實時性上面。固然 Flink 也能夠支持 Batch 的任務,以及 DAG 的運算。session

或許會有人不一樣意以上的分類,我以爲其實這並不重要的,重要的是體會各個框架的差別,以及更適合的場景。並進行理解,沒有哪個框架能夠完美的支持全部的場景,也就不可能有任何一個框架能徹底取代另外一個,就像 Spark 沒有徹底取代 Hadoop,固然 Flink 也不可能取代 Spark。本文將致力描述 Flink 的原理以及應用。架構

 

Flink 簡介

不少人可能都是在 2015 年才聽到 Flink 這個詞,其實早在 2008 年,Flink 的前身已是柏林理工大學一個研究性項目, 在 2014 被 Apache 孵化器所接受,而後迅速地成爲了 ASF(Apache Software Foundation)的頂級項目之一。Flink 的最新版本目前已經更新到了 0.10.0 了,在不少人感慨 Spark 的快速發展的同時,或許咱們也該爲 Flink 的發展速度點個贊。app

Flink 是一個針對流數據和批數據的分佈式處理引擎。它主要是由 Java 代碼實現。目前主要仍是依靠開源社區的貢獻而發展。對 Flink 而言,其所要處理的主要場景就是流數據,批數據只是流數據的一個極限特例而已。再換句話說,Flink 會把全部任務當成流來處理,這也是其最大的特色。Flink 能夠支持本地的快速迭代,以及一些環形的迭代任務。而且 Flink 能夠定製化內存管理。在這點,若是要對比 Flink 和 Spark 的話,Flink 並無將內存徹底交給應用層。這也是爲何 Spark 相對於 Flink,更容易出現 OOM 的緣由(out of memory)。就框架自己與應用場景來講,Flink 更類似與 Storm。若是以前瞭解過 Storm 或者 Flume 的讀者,可能會更容易理解 Flink 的架構和不少概念。下面讓咱們先來看下 Flink 的架構圖。框架

圖 1. Flink 架構圖

圖 1. Flink 架構圖

如圖 1 所示,咱們能夠了解到 Flink 幾個最基礎的概念,Client、JobManager 和 TaskManager。Client 用來提交任務給 JobManager,JobManager 分發任務給 TaskManager 去執行,而後 TaskManager 會心跳的彙報任務狀態。看到這裏,有的人應該已經有種回到 Hadoop 一代的錯覺。確實,從架構圖去看,JobManager 很像當年的 JobTracker,TaskManager 也很像當年的 TaskTracker。然而有一個最重要的區別就是 TaskManager 之間是是流(Stream)。其次,Hadoop 一代中,只有 Map 和 Reduce 之間的 Shuffle,而對 Flink 而言,多是不少級,而且在 TaskManager 內部和 TaskManager 之間都會有數據傳遞,而不像 Hadoop,是固定的 Map 到 Reduce。curl

Flink 中的調度簡述

在 Flink 集羣中,計算資源被定義爲 Task Slot。每一個 TaskManager 會擁有一個或多個 Slots。JobManager 會以 Slot 爲單位調度 Task。可是這裏的 Task 跟咱們在 Hadoop 中的理解是有區別的。對 Flink 的 JobManager 來講,其調度的是一個 Pipeline 的 Task,而不是一個點。舉個例子,在 Hadoop 中 Map 和 Reduce 是兩個獨立調度的 Task,而且都會去佔用計算資源。對 Flink 來講 MapReduce 是一個 Pipeline 的 Task,只佔用一個計算資源。類同的,若是有一個 MRR 的 Pipeline Task,在 Flink 中其也是一個被總體調度的 Pipeline Task。在 TaskManager 中,根據其所擁有的 Slot 個數,同時會擁有多個 Pipeline。

在 Flink StandAlone 的部署模式中,這個還比較容易理解。由於 Flink 自身也須要簡單的管理計算資源(Slot)。當 Flink 部署在 Yarn 上面以後,Flink 並無弱化資源管理。也就是說這時候的 Flink 在作一些 Yarn 該作的事情。從設計角度來說,我認爲這是不太合理的。若是 Yarn 的 Container 沒法徹底隔離 CPU 資源,這時候對 Flink 的 TaskManager 配置多個 Slot,應該會出現資源不公平利用的現象。Flink 若是想在數據中心更好的與其餘計算框架共享計算資源,應該儘可能不要干預計算資源的分配和定義。

須要深度學習 Flink 調度讀者,能夠在 Flink 的源碼目錄中找到 flink-runtime 這個文件夾,JobManager 的 code 基本都在這裏。

Flink 的生態圈

一個計算框架要有長遠的發展,必須打造一個完整的 Stack。否則就跟紙上談兵同樣,沒有任何意義。只有上層有了具體的應用,並能很好的發揮計算框架自己的優點,那麼這個計算框架才能吸引更多的資源,纔會更快的進步。因此 Flink 也在努力構建本身的 Stack。

Flink 首先支持了 Scala 和 Java 的 API,Python 也正在測試中。Flink 經過 Gelly 支持了圖操做,還有機器學習的 FlinkML。Table 是一種接口化的 SQL 支持,也就是 API 支持,而不是文本化的 SQL 解析和執行。對於完整的 Stack 咱們能夠參考下圖。

圖 2. Flink 的 Stack

圖 2. Flink 的 Stack

Flink 爲了更普遍的支持大數據的生態圈,其下也實現了不少 Connector 的子項目。最熟悉的,固然就是與 Hadoop HDFS 集成。其次,Flink 也宣佈支持了 Tachyon、S3 以及 MapRFS。不過對於 Tachyon 以及 S3 的支持,都是經過 Hadoop HDFS 這層包裝實現的,也就是說要使用 Tachyon 和 S3,就必須有 Hadoop,並且要更改 Hadoop 的配置(core-site.xml)。若是瀏覽 Flink 的代碼目錄,咱們就會看到更多 Connector 項目,例如 Flume 和 Kafka。

Flink 的部署

Flink 有三種部署模式,分別是 Local、Standalone Cluster 和 Yarn Cluster。對於 Local 模式來講,JobManager 和 TaskManager 會公用一個 JVM 來完成 Workload。若是要驗證一個簡單的應用,Local 模式是最方便的。實際應用中大多使用 Standalone 或者 Yarn Cluster。下面我主要介紹下這兩種模式。

Standalone 模式

在搭建 Standalone 模式的 Flink 集羣以前,咱們須要先下載 Flink 安裝包。這裏咱們須要下載 Flink 針對 Hadoop 1.x 的包。下載並解壓後,進到 Flink 的根目錄,而後查看 conf 文件夾,以下圖。

圖 3. Flink 的目錄結構

圖 3. Flink 的目錄結構

咱們須要指定 Master 和 Worker。Master 機器會啓動 JobManager,Worker 則會啓動 TaskManager。所以,咱們須要修改 conf 目錄中的 master 和 slaves。在配置 master 文件時,須要指定 JobManager 的 UI 監聽端口。通常狀況下,JobManager 只需配置一個,Worker 則須配置一個或多個(以行爲單位)。示例以下:

1
2
3
4
5
micledeMacBook-Pro:conf micle$ cat masters
localhost:8081
 
micledeMacBook-Pro:conf micle$ cat slaves
localhost

在 conf 目錄中找到文件 flink-conf.yaml。在這個文件中定義了 Flink 各個模塊的基本屬性,如 RPC 的端口,JobManager 和 TaskManager 堆的大小等。在不考慮 HA 的狀況下,通常只須要修改屬性 taskmanager.numberOfTaskSlots,也就是每一個 Task Manager 所擁有的 Slot 個數。這個屬性,通常設置成機器 CPU 的 core 數,用來平衡機器之間的運算性能。其默認值爲 1。配置完成後,使用下圖中的命令啓動 JobManager 和 TaskManager(啓動以前,須要確認 Java 的環境是否已經就緒)。

圖 4. 啓動 StandAlone 模式的 Flink

圖 4. 啓動 StandAlone 模式的 Flink

啓動以後咱們就能夠登錄 Flink 的 GUI 頁面。在頁面中咱們能夠看到 Flink 集羣的基本屬性,在 JobManager 和 TaskManager 的頁面中,能夠看到這兩個模塊的屬性。目前 Flink 的 GUI,只提供了簡單的查看功能,沒法動態修改配置屬性。通常在企業級應用中,這是很難被接受的。所以,一個企業真正要應用 Flink 的話,估計也不得不增強 WEB 的功能。

圖 5. Flink 的 GUI 頁面

圖 5. Flink 的 GUI 頁面

Yarn Cluster 模式

在一個企業中,爲了最大化的利用集羣資源,通常都會在一個集羣中同時運行多種類型的 Workload。所以 Flink 也支持在 Yarn 上面運行。首先,讓咱們經過下圖瞭解下 Yarn 和 Flink 的關係。

圖 6. Flink 與 Yarn 的關係

圖 6. Flink 與 Yarn 的關係

在圖中能夠看出,Flink 與 Yarn 的關係與 MapReduce 和 Yarn 的關係是同樣的。Flink 經過 Yarn 的接口實現了本身的 App Master。當在 Yarn 中部署了 Flink,Yarn 就會用本身的 Container 來啓動 Flink 的 JobManager(也就是 App Master)和 TaskManager。

瞭解了 Flink 與 Yarn 的關係,咱們就簡單看下部署的步驟。在這以前須要先部署好 Yarn 的集羣,這裏我就不作介紹了。咱們能夠經過如下的命令查看 Yarn 中現有的 Application,而且來檢查 Yarn 的狀態。

1
yarn application –list

若是命令正確返回了,就說明 Yarn 的 RM 目前已經在啓動狀態。針對不一樣的 Yarn 版本,Flink 有不一樣的安裝包。咱們能夠在 Apache Flink 的下載頁中找到對應的安裝包。個人 Yarn 版本爲 2.7.1。再介紹具體的步驟以前,咱們須要先了解 Flink 有兩種在 Yarn 上面的運行模式。一種是讓 Yarn 直接啓動 JobManager 和 TaskManager,另外一種是在運行 Flink Workload 的時候啓動 Flink 的模塊。前者至關於讓 Flink 的模塊處於 Standby 的狀態。這裏,我也主要介紹下前者。

在下載和解壓 Flink 的安裝包以後,須要在環境中增長環境變量 HADOOP_CONF_DIR 或者 YARN_CONF_DIR,其指向 Yarn 的配置目錄。如運行下面的命令:

1
export HADOOP_CONF_DIR=/etc/hadoop/conf

這是由於 Flink 實現了 Yarn 的 Client,所以須要 Yarn 的一些配置和 Jar 包。在配置好環境變量後,只需簡單的運行以下的腳本,Yarn 就會啓動 Flink 的 JobManager 和 TaskManager。

1
yarn-session.sh –d –s 2 –tm 800 –n 2

上面的命令的意思是,向 Yarn 申請 2 個 Container 啓動 TaskManager(-n 2),每一個 TaskManager 擁有兩個 Task Slot(-s 2),而且向每一個 TaskManager 的 Container 申請 800M 的內存。在上面的命令成功後,咱們就能夠在 Yarn Application 頁面看到 Flink 的紀錄。以下圖。

圖 7. Flink on Yarn

圖 7. Flink on Yarn

若是有些讀者在虛擬機中測試,可能會遇到錯誤。這裏須要注意內存的大小,Flink 向 Yarn 會申請多個 Container,可是 Yarn 的配置可能限制了 Container 所能申請的內存大小,甚至 Yarn 自己所管理的內存就很小。這樣極可能沒法正常啓動 TaskManager,尤爲當指定多個 TaskManager 的時候。所以,在啓動 Flink 以後,須要去 Flink 的頁面中檢查下 Flink 的狀態。這裏能夠從 RM 的頁面中,直接跳轉(點擊 Tracking UI)。這時候 Flink 的頁面如圖 8。

圖 8. Flink 的頁面

圖 8. Flink 的頁面

對於 Flink 安裝時的 Trouble-shooting,可能更多時候須要查看 Yarn 相關的 log 來分析。這裏就很少作介紹,讀者能夠到 Yarn 相關的描述中查找。

Flink 的 HA

對於一個企業級的應用,穩定性是首要要考慮的問題,而後纔是性能,所以 HA 機制是必不可少的。另外,對於已經瞭解 Flink 架構的讀者,可能會更擔憂 Flink 架構背後的單點問題。和 Hadoop 一代同樣,從架構中咱們能夠很明顯的發現 JobManager 有明顯的單點問題(SPOF,single point of failure)。 JobManager 肩負着任務調度以及資源分配,一旦 JobManager 出現意外,其後果可想而知。Flink 對 JobManager HA 的處理方式,原理上基本和 Hadoop 同樣(一代和二代)。

首先,咱們須要知道 Flink 有兩種部署的模式,分別是 Standalone 以及 Yarn Cluster 模式。對於 Standalone 來講,Flink 必須依賴於 Zookeeper 來實現 JobManager 的 HA(Zookeeper 已經成爲了大部分開源框架 HA 必不可少的模塊)。在 Zookeeper 的幫助下,一個 Standalone 的 Flink 集羣會同時有多個活着的 JobManager,其中只有一個處於工做狀態,其餘處於 Standby 狀態。當工做中的 JobManager 失去鏈接後(如宕機或 Crash),Zookeeper 會從 Standby 中選舉新的 JobManager 來接管 Flink 集羣。

對於 Yarn Cluaster 模式來講,Flink 就要依靠 Yarn 自己來對 JobManager 作 HA 了。其實這裏徹底是 Yarn 的機制。對於 Yarn Cluster 模式來講,JobManager 和 TaskManager 都是被 Yarn 啓動在 Yarn 的 Container 中。此時的 JobManager,其實應該稱之爲 Flink Application Master。也就說它的故障恢復,就徹底依靠着 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 同樣)。因爲徹底依賴了 Yarn,所以不一樣版本的 Yarn 可能會有細微的差別。這裏再也不作深究。

Flink 的 Rest API 介紹

Flink 和其餘大多開源的框架同樣,提供了不少有用的 Rest API。不過 Flink 的 RestAPI,目前還不是很強大,只能支持一些 Monitor 的功能。Flink Dashboard 自己也是經過其 Rest 來查詢各項的結果數據。在 Flink RestAPI 基礎上,能夠比較容易的將 Flink 的 Monitor 功能和其餘第三方工具相集成,這也是其設計的初衷。

在 Flink 的進程中,是由 JobManager 來提供 Rest API 的服務。所以在調用 Rest 以前,要肯定 JobManager 是否處於正常的狀態。正常狀況下,在發送一個 Rest 請求給 JobManager 以後,Client 就會收到一個 JSON 格式的返回結果。因爲目前 Rest 提供的功能還很少,須要加強這塊功能的讀者能夠在子項目 flink-runtime-web 中找到對應的代碼。其中最關鍵一個類 WebRuntimeMonitor,就是用來對全部的 Rest 請求作分流的,若是須要添加一個新類型的請求,就須要在這裏增長對應的處理代碼。下面我例舉幾個經常使用 Rest API。

1.查詢 Flink 集羣的基本信息: /overview。示例命令行格式以及返回結果以下:

1
2
$ curl http://localhost:8081/overview{"taskmanagers":1,"slots-total":16,
"slots-available":16,"jobs-running":0,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0}

2.查詢當前 Flink 集羣中的 Job 信息:/jobs。示例命令行格式以及返回結果以下:

1
2
$ curl http://localhost:8081/jobs{"jobs-running":[],"jobs-finished":
["f91d4dd4fdf99313d849c9c4d29f8977"],"jobs-cancelled":[],"jobs-failed":[]}

3.查詢一個指定的 Job 信息: /jobs/jobid。這個查詢的結果會返回特別多的詳細的內容,這是我在瀏覽器中進行的測試,以下圖:

圖 9. Rest 查詢具體的 Job 信息

圖 9. Rest 查詢具體的 Job 信息

想要了解更多 Rest 請求內容的讀者,能夠去 Apache Flink 的頁面中查找。因爲篇幅有限,這裏就不一一列舉。

運行 Flink 的 Workload

WordCount 的例子,就像是計算框架的 helloworld。這裏我就以 WordCount 爲例,介紹下如何在 Flink 中運行 workload。

在安裝好 Flink 的環境中,找到 Flink 的目錄。而後找到 bin/flink,它就是用來提交 Flink workload 的工具。對於 WordCount,咱們能夠直接使用已有的示例 jar 包。如運行以下的命令:

1
./bin/flink run ./examples/WordCount.jar hdfs://user/root/test hdfs://user/root/out

上面的命令是在 HDFS 中運行 WordCount,若是沒有 HDFS 用本地的文件系統也是能夠的,只須要將「hdfs://」換成「file://」。這裏咱們須要強調一種部署關係,就是 StandAlone 模式的 Flink,也是能夠直接訪問 HDFS 等分佈式文件系統的。

結束語

Flink 是一個比 Spark 起步晚的項目,可是並不表明 Flink 的前途就會暗淡。Flink 和 Spark 有不少相似之處,但也有不少明顯的差別。本文並無比較這二者之間的差別,這是將來我想與你們探討的。例如 Flink 如何更高效的管理內存,如何進一步的避免用戶程序的 OOM。在 Flink 的世界裏一切都是流,它更專一處理流應用。因爲其起步晚,加上社區的活躍度並無 Spark 那麼熱,因此其在一些細節的場景支持上,並無 Spark 那麼完善。例如目前在 SQL 的支持上並無 Spark 那麼平滑。在企業級應用中,Spark 已經開始落地,而 Flink 可能還須要一段時間的打磨。在後續文章中,我會詳細介紹如何開發 Flink 的程序,以及更多有關 Flink 內部實現的內容。

 

內容來自:https://www.ibm.com/developerworks/cn/opensource/os-cn-apache-flink/

相關文章
相關標籤/搜索