經過WordCount來入門Flink,讀懂Flink基礎架構,Flink學習入門(一)

你們好,我是後來,我會分享我在學習和工做中遇到的點滴,但願有機會個人某篇文章可以對你有所幫助,全部的文章都會在公衆號首發,歡迎你們關注個人公衆號" 後來X大數據 ",感謝你的支持與承認。

前幾天寫的計算機網絡的網絡層在csdn閱讀量快破5000了,也給我帶來了很多的粉絲,仍是很是開心的,給了我寫文很大的動力。在這裏看一下:[
警察叔叔順着網線是怎麼找到你的?計算機網絡(四)之網絡層未完待續](https://editor.csdn.net/md/?a...java

最近公司愈來愈多的業務要用到Flink,我也正好把知識點再複習下,作到學以至用,哈哈,並且前幾天看到Flink1.11版本都開始支持hive流處理了,仍是比較興奮的。由於本身關於Flink的經驗也不是不少,因此我就再以小白的身份寫個Flink學習專欄。各位大佬不喜勿噴。linux

寫完基本知識,也會夾雜着工做實例,算是給本身作個筆記。但願某篇文章能對你有所幫助。
強烈建議:閱讀官網!
我學習一個新技術的步驟大概是這樣的:web

  1. 先了解這個技術要解決什麼問題
  2. 簡單上手體驗一下,找找自信心
  3. 學習架構,知曉原理
  4. 學習API,體驗高級玩法
  5. 搞個小項目上手,學以至用。

一、Flink大體介紹

關於實時處理與離線處理,一個很大的不一樣就在於,數據是否是有界的。apache

  1. 有界流:數據有終點,好比要對一個txt文本作wordCount。
  2. 無界流:數據有起點,沒有終點,好比說是從socket 端口拿數據計算wordCount,能夠無休止的產生數據。

而在實時處理方面,又有Flink和Spark Streaming,那麼他倆最大的區別就是Flink是真正的流處理,而spark Streaming是微批次處理。編程

  1. Flink能夠以事件爲單位,來一條數據就處理一條(能夠,不是隻能,flink也能夠 以時間窗口爲單位進行計算,你們不要誤解)
  2. Spark Streaming是以一個窗口爲單位,一次處理一批

固然flink也能夠擅長作批處理,只不過如今flink表明的更多的是實時處理。api

1.1 Flink的組件棧有哪些?

在這裏插入圖片描述
這些組件能夠先大概知道有這麼回事,而後後續的學習中一點點理解就記住了。
圖中也能知道:緩存

  1. 運行模式大體分爲3種,本地、集羣、雲
  2. DataStream API流處理
  3. DataSet API批處理
  4. 上層還支持CEP、SQL、機器學習ML、圖計算。

二、Flink初體驗

仍是先在IDE中來一個WordCount吧。這個先直接複製了,跑起來咱們再來分析其中的東西。
這個代碼是scala代碼寫的,關於建項目和導入scala框架這個你們百度吧。bash

import org.apache.Flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.Flink.streaming.api.scala._

/**
  * @description: WordCount小入門
  * @author: Liu Jun Jun
  * @create: 2020-06-04 10:10
  **/
object WordCount {
  def main(args: Array[String]): Unit = {
  //獲取環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//從客戶端獲取流式數據
    val wordDS: DataStream[String] = env.socketTextStream("bigdata101",3456)
//對數據進行轉換,按照單詞分組,最後求和
    val resultDS = wordDS.map((_,1)).keyBy(_._1).sum(1)
//對結果進行打印
    resultDS.print("ceshi:")
//真正的執行命令,前面這些都是懶加載的,只有在遇到execute纔會觸發執行
    env.execute("wordCount")
  }

}

測試——在linux系統中用netcat命令進行發送測試。沒有nc 的能夠安裝一下
(yum -y install nc)
nc -lk 3456
而後本身寫點單詞,控制檯看輸出結果:
結果展現:
在這裏插入圖片描述
在這個測試案例中,咱們已經體驗到了Flink的流式處理。
結果展現中,前面的數字表明的當前這個任務跑在個人哪一個cpu上,我這個電腦是4個cpu,默認使用所有資源,因此它本身選擇執行。固然你會發現later這個單詞總在cpu4上。微信

好了,那就繼續往下走,咱們剛只是在IDE中體驗了一把,可是咱們實際生產中仍是要打包放在集羣上跑的。網絡

那麼接下里咱們在集羣上部署一下Flink

三、Flink安裝

說到Flink的集羣安裝,就有幾種模式。本地模式通常也就是自學用,因此這裏就暫時不安裝了。來看看集羣部署。

  1. Standalone模式:這種模式下的Flink使用本身的資源管理及任務調度,不依賴於hadoop,目前使用的不是不少。
  2. yarn模式:這種模式的話,Flink更像是一個客戶端,作了一個計算引擎,把任務提交到yarn,由yarn來進行資源的分配以及任務的調度。但計算使用的是Flink引擎。使用的比較多。

以上兩種方式,根據公司需求不一樣選擇不一樣。我在這裏主要講一下yarn模式
根據官網介紹:
若是你計劃將 Apache Flink 與 Apache Hadoop 一塊兒使用(在 YARN 上運行 Flink ,鏈接到 HDFS ,鏈接到 HBase 等),則須要下載好Flink後,把hadoop組件放在Flink的lib目錄下,這個在官網有說明,若是官網沒提供你的hadoop版本,那就須要本身編譯了。
在這裏插入圖片描述
**我這裏直接提供一個Flink1.7.2版本集成好hadoop依賴的包,直接解壓部署就能夠了。
解壓值須要配置web頁面地址,固然不配置跑任務也沒有問題。 連接在個人微信公衆號【後來X大數據】,回覆」flink「就能夠直接下載。**

yarn模式也有2種類型:

  1. Session-Cluster

在這裏插入圖片描述
這種模式就是在啓動hadoop集羣后,先申請一塊空間,也就是啓動yarn-session,之後全部提交的任務都在這塊空間內執行。至關因而承包商。不以下面的方式靈活。

  1. Per-Job-Cluster

在這裏插入圖片描述
這個則是每次提交一個Flink任務,都會單獨的只申請本身所須要的空間,組成flink集羣,任務執行完就註銷掉。任務之間互相獨立,互不影響,方便管理。

很明顯,第二種方式對資源的利用更加靈活。

那麼接下來咱們提交個任務看看。咱們就用官方的WordCount測試包吧。本身寫個文件,裏面隨便寫點單詞。

bin/flink run -m yarn-cluster -yn 7 -ys 2 -p 14  -ytm 2048m -yjm 2048m  -yqu Flink ./examples/batch/WordCount.jar --input /opt/wc.txt --output /opt/output4/

bin/Flink 後面其實能夠指定不少參數,你們能夠bin/Flink --help查看一下
-m 指定任務運行模式
-yqu 指定提交任務的隊列
-n(--container):TaskManager的數量。
-s(--slots): 每一個TaskManager的slot數量,默認一個slot一個core,默認每一個taskmanager的slot的個數爲1,有時能夠多一些taskmanager,作冗餘。
-jm:JobManager的內存(單位MB)。
-tm:每一個taskmanager的內存(單位MB)。
-nm:yarn 的appName(如今yarn的ui上的名字)。
-d:後臺執行。

那一看命令這麼多參數,那咱們平時怎麼提交任務就會方便一些呢?通常狀況下咱們寫成腳本執行。

提交任務後能夠在yarn上看到這個任務,經過Application,能夠進入webUI頁面,咱們能夠看到這個任務的流程圖。(這個圖我完了補上,寫文的電腦上沒裝集羣)

四、Flink架構

經過上述的介紹,其實對Flink已經有了初步的認識。那咱們來初步理解一下Flink的架構,前期只須要大體理解就能夠了,更多的理解仍是要基於使用,畢竟實踐出真知!

4.1 Flink在運行時會包含這幾種主要的角色:

  1. Job Managers:提交的這個任務的老大,管理Flink集羣中從節點TaskManager
  2. Task Managers:單個任務的管理者,負責管理其所在節點上的資源信息,如內存、磁盤、網絡,在啓動的時候將資源的狀態向JobManager彙報
  3. Resource Manager:集羣資源的管理者
  4. Clients:提交做業的機器

上面提到的是任務在運行中有哪些具體的角色,那麼廣義上來講,Flink在這裏充當的角色更多的是一個客戶端,用來提交job。

4.2Flink on yarn時,提交任務的流程就以下圖所示:

在這裏插入圖片描述

  1. Flink提交後,Client向HDFS上傳Flink的Jar包和配置
  2. Client給RM提交任務
  3. RM分配資源並通知選中的對應的NodeManager,啓動ApplicationMaster來做爲這個任務全部資源的老大,進行管理。
  4. ApplicationMaster啓動後加載Flink的Jar包和配置構建環境,而後啓動JobManager
  5. AM向RM申請資源啓動TaskManager,AM分配資源之後,AM就通知資源所在的節點的NodeManager啓動TaskManager
  6. NodeManager加載Flink的Jar包和配置構建環境並啓動TaskManager,TaskManager啓動後向JobManager發送心跳包,並等待JobManager向其分配任務。

4.3那麼Flink任務是怎麼資源調度的?

咱們來跟着官網的思路走一下:

  1. 咱們的每個TaskManager都是一個JVM進程,這麼理解吧,TaskManager啓動在NodeManager所在的節點,就等於說是一個節點一個TaskManager。
  2. 這個JVM進程能夠在不一樣的線程中執行一個或多個 subtasks(子任務),那確定啊,一個節點同時執行不少個子任務。可是同時執行的子任務過多,是否是會搶佔資源比較嚴重,那麼幾個會比較合理呢?
  3. 在Flink中有個插槽的概念,slot: TaskManager 的一份固定資源子集。目前僅僅用來隔離task的內存,例如,具備三個 slots 的 TaskManager 會將其管理的內存資源分紅三等份給每一個 slot。這樣作的好處就是劃分到不一樣slot的子任務集不會再搶佔別的slot資源。
  4. 可是問題是:這樣也不公平,好比2個須要資源相差很大的子任務劃分到了不一樣的slot中,會出現須要資源小的任務早就跑完了,而另外一個須要資源多的任務卻遲遲跑不完。因此,Flink 容許 subtasks 共享 slots,即便它們是不一樣 tasks 的 subtasks,只要它們來自同一個 job。這樣的好處是:

(1)須要資源少的子任務能夠劃分到一個slot,而須要資源多的能夠單獨劃分到一個slot,能夠充分利用slot資源,同時確保繁重的 subtask 在 TaskManagers 之間公平地獲取資源。
(2)Flink 的並行度只要控制好合理的slot數就能夠了,由於每一個slot都是一個線程。這樣不須要計算做業總共包含多少個 tasks。
在這裏插入圖片描述
根據經驗,合理的 slots 數量應該和 CPU 核數相同。這個在實際的工做中,應在是看給本身分到的隊列的資源一共是多少,而本身預估這個任務大概須要多少資源,而後合理的設置slots數,也就是合理的設置並行度。

4.4 Flink的Slot和parallelism有什麼區別?

注意:

  1. Task Slot是靜態的概念,是指TaskManager 具備的併發執行能力 ,能夠經過參數taskmanager.numberOfTaskSlots進行配置;
  2. 而並行度parallelism是動態概念,即TaskManager運行程序時實際使用的併發能力,能夠經過參數parallelism.default進行配置。

這麼舉例子吧,咱們提交一個job,就有了一個JobManger,那麼經過資源的分配,假如如今在3個節點上執行任務,那就等於說有3個TaskManager,假如每個TaskManager都包含了必定數量的插槽(slots)。插槽的數量限制了TaskManager可以執行的任務數量。這裏假設每一個TaskManager能夠接收3個task,一共9個TaskSlot,若是咱們設置parallelism.default=1,即運行程序默認的並行度爲1,9個TaskSlot只用了1個,有8個空閒,所以,設置合適的並行度才能提升效率。

4.5 Flink的並行度是什麼?怎麼設置纔算是合理呢?

算子的並行度: 一個特定算子的子任務(subtask)的個數被稱之爲其並行度(parallelism)。一個程序中,不一樣的算子可能具備不一樣的並行度。
那麼具體的每一個算子的並行度是多少這個咱們後面具體說算子的時候再來說,這裏先大體介紹一下:

  1. One-to-one,相似於spark的窄依賴,好比map、filter等
  2. 一對多,相似於spark的寬依賴。

相同並行度的one to one操做,Flink這樣相連的算子連接在一塊兒造成一個task,原來的算子成爲裏面的一部分。將算子連接成task是很是有效的優化:它能減小線程之間的切換和基於緩存區的數據交換,在減小時延的同時提高吞吐量。

任務鏈必須知足兩個條件:one-to-one的數據傳輸而且並行度相同
job的並行度:任務被分爲多個並行任務來執行,其中每一個並行的實例處理一部分數據。這些並行實例的數量被稱爲並行度。

咱們在實際生產環境中能夠從四個不一樣層面設置並行度:(具體代碼體如今後續寫)
操做算子層面(Operator Level)
執行環境層面(Execution Environment Level)
客戶端層面(Client Level)
系統層面(System Level)
須要注意的優先級:算子層面>環境層面>客戶端層面>系統層面。

4.6 Flink的基礎編程模型

咱們在上述wordCount的代碼中,就發現數據流由3部分組成,數據源source,數據轉換,數據最後的流出sink 3部分組成。那麼這其實也是咱們代碼的主要構成,經過合適的轉換算子將數據源的數據進行處理,最後把結果經過sink的方式輸出到別的地方。
每個dataflow以一個或多個sources開始以一個或多個sinks結束。
在這裏插入圖片描述
Flink 將算子的子任務連接成 task。每一個 task 由一個線程執行。把算子連接成 tasks 可以減小線程間切換和緩衝的開銷,在下降延遲的同時提升了總體吞吐量。

4.7 Flink的流程圖有哪些?

那麼上述的數據流直接映射成的數據流圖是StreamGraph,也被稱爲邏輯流圖,由於它們表示的是計算邏輯的高級視圖。爲了執行一個流處理程序,Flink須要將邏輯流圖轉換爲物理數據流圖(也叫執行圖),詳細說明程序的執行方式。
Flink 中的執行圖能夠分紅四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖。

  1. StreamGraph:是根據用戶經過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結構。
  2. JobGraph:StreamGraph通過優化後生成了 JobGraph,提交給 JobManager 的數據結構。主要的優化爲,將多個符合條件的節點 chain 在一塊兒做爲一個節點,這樣能夠減小數據在節點之間流動所須要的序列化/反序列化/傳輸消耗。
  3. ExecutionGraph:JobManager 根據 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的並行化版本,是調度層最核心的數據結構。
  4. 物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行調度後,在各個TaskManager 上部署 Task 後造成的「圖」,並非一個具體的數據結構。

這幾個類在源碼中也能找到。

關於Flink比較基礎的框架概念已經瞭解的差很少了,部份內容也來源於官網中文翻譯。
可能有些概念還沒理解透徹,不過不要緊,在接下來的應用中,使用的多了就會有不同的收穫,指望經過一篇文章或者只看官網的介紹理解透徹是不存在的,畢竟這框架是衆多大牛彙集在一塊兒搞了不少年才搞出來的,咱們只不過是個框架的使用者,約等於搬磚工。

因此我寫的內容也歡迎你們前來討論。

那麼下一篇Flink的文章我來繼續學習關於Flink的API。期待能和你一塊兒學習!

掃碼關注」後來X大數據「,回覆【電子書】,領取【超多本pdf java及大數據 電子書】

在這裏插入圖片描述

相關文章
相關標籤/搜索