你們好,我是後來,我會分享我在學習和工做中遇到的點滴,但願有機會個人某篇文章可以對你有所幫助,全部的文章都會在公衆號首發,歡迎你們關注個人公衆號" 後來X大數據 ",感謝你的支持與承認。
前幾天寫的計算機網絡的網絡層在csdn閱讀量快破5000了,也給我帶來了很多的粉絲,仍是很是開心的,給了我寫文很大的動力。在這裏看一下:[
警察叔叔順着網線是怎麼找到你的?計算機網絡(四)之網絡層未完待續](https://editor.csdn.net/md/?a...java
最近公司愈來愈多的業務要用到Flink,我也正好把知識點再複習下,作到學以至用,哈哈,並且前幾天看到Flink1.11版本都開始支持hive流處理了,仍是比較興奮的。由於本身關於Flink的經驗也不是不少,因此我就再以小白的身份寫個Flink學習專欄。各位大佬不喜勿噴。linux
寫完基本知識,也會夾雜着工做實例,算是給本身作個筆記。但願某篇文章能對你有所幫助。
強烈建議:閱讀官網!
我學習一個新技術的步驟大概是這樣的:web
關於實時處理與離線處理,一個很大的不一樣就在於,數據是否是有界的。apache
而在實時處理方面,又有Flink和Spark Streaming,那麼他倆最大的區別就是Flink是真正的流處理,而spark Streaming是微批次處理。編程
固然flink也能夠擅長作批處理,只不過如今flink表明的更多的是實時處理。api
這些組件能夠先大概知道有這麼回事,而後後續的學習中一點點理解就記住了。
圖中也能知道:緩存
仍是先在IDE中來一個WordCount吧。這個先直接複製了,跑起來咱們再來分析其中的東西。
這個代碼是scala代碼寫的,關於建項目和導入scala框架這個你們百度吧。bash
import org.apache.Flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.Flink.streaming.api.scala._ /** * @description: WordCount小入門 * @author: Liu Jun Jun * @create: 2020-06-04 10:10 **/ object WordCount { def main(args: Array[String]): Unit = { //獲取環境 val env = StreamExecutionEnvironment.getExecutionEnvironment //從客戶端獲取流式數據 val wordDS: DataStream[String] = env.socketTextStream("bigdata101",3456) //對數據進行轉換,按照單詞分組,最後求和 val resultDS = wordDS.map((_,1)).keyBy(_._1).sum(1) //對結果進行打印 resultDS.print("ceshi:") //真正的執行命令,前面這些都是懶加載的,只有在遇到execute纔會觸發執行 env.execute("wordCount") } }
測試——在linux系統中用netcat命令進行發送測試。沒有nc 的能夠安裝一下
(yum -y install nc)
nc -lk 3456
而後本身寫點單詞,控制檯看輸出結果:
結果展現:
在這個測試案例中,咱們已經體驗到了Flink的流式處理。
結果展現中,前面的數字表明的當前這個任務跑在個人哪一個cpu上,我這個電腦是4個cpu,默認使用所有資源,因此它本身選擇執行。固然你會發現later這個單詞總在cpu4上。微信
好了,那就繼續往下走,咱們剛只是在IDE中體驗了一把,可是咱們實際生產中仍是要打包放在集羣上跑的。網絡
那麼接下里咱們在集羣上部署一下Flink
說到Flink的集羣安裝,就有幾種模式。本地模式通常也就是自學用,因此這裏就暫時不安裝了。來看看集羣部署。
以上兩種方式,根據公司需求不一樣選擇不一樣。我在這裏主要講一下yarn模式
根據官網介紹:
若是你計劃將 Apache Flink 與 Apache Hadoop 一塊兒使用(在 YARN 上運行 Flink ,鏈接到 HDFS ,鏈接到 HBase 等),則須要下載好Flink後,把hadoop組件放在Flink的lib目錄下,這個在官網有說明,若是官網沒提供你的hadoop版本,那就須要本身編譯了。
**我這裏直接提供一個Flink1.7.2版本集成好hadoop依賴的包,直接解壓部署就能夠了。
解壓值須要配置web頁面地址,固然不配置跑任務也沒有問題。 連接在個人微信公衆號【後來X大數據】,回覆」flink「就能夠直接下載。**
yarn模式也有2種類型:
這種模式就是在啓動hadoop集羣后,先申請一塊空間,也就是啓動yarn-session,之後全部提交的任務都在這塊空間內執行。至關因而承包商。不以下面的方式靈活。
這個則是每次提交一個Flink任務,都會單獨的只申請本身所須要的空間,組成flink集羣,任務執行完就註銷掉。任務之間互相獨立,互不影響,方便管理。
很明顯,第二種方式對資源的利用更加靈活。
那麼接下來咱們提交個任務看看。咱們就用官方的WordCount測試包吧。本身寫個文件,裏面隨便寫點單詞。
bin/flink run -m yarn-cluster -yn 7 -ys 2 -p 14 -ytm 2048m -yjm 2048m -yqu Flink ./examples/batch/WordCount.jar --input /opt/wc.txt --output /opt/output4/
bin/Flink 後面其實能夠指定不少參數,你們能夠bin/Flink --help查看一下
-m 指定任務運行模式
-yqu 指定提交任務的隊列
-n(--container):TaskManager的數量。
-s(--slots): 每一個TaskManager的slot數量,默認一個slot一個core,默認每一個taskmanager的slot的個數爲1,有時能夠多一些taskmanager,作冗餘。
-jm:JobManager的內存(單位MB)。
-tm:每一個taskmanager的內存(單位MB)。
-nm:yarn 的appName(如今yarn的ui上的名字)。
-d:後臺執行。
那一看命令這麼多參數,那咱們平時怎麼提交任務就會方便一些呢?通常狀況下咱們寫成腳本執行。
提交任務後能夠在yarn上看到這個任務,經過Application,能夠進入webUI頁面,咱們能夠看到這個任務的流程圖。(這個圖我完了補上,寫文的電腦上沒裝集羣)
經過上述的介紹,其實對Flink已經有了初步的認識。那咱們來初步理解一下Flink的架構,前期只須要大體理解就能夠了,更多的理解仍是要基於使用,畢竟實踐出真知!
上面提到的是任務在運行中有哪些具體的角色,那麼廣義上來講,Flink在這裏充當的角色更多的是一個客戶端,用來提交job。
咱們來跟着官網的思路走一下:
(1)須要資源少的子任務能夠劃分到一個slot,而須要資源多的能夠單獨劃分到一個slot,能夠充分利用slot資源,同時確保繁重的 subtask 在 TaskManagers 之間公平地獲取資源。
(2)Flink 的並行度只要控制好合理的slot數就能夠了,由於每一個slot都是一個線程。這樣不須要計算做業總共包含多少個 tasks。
根據經驗,合理的 slots 數量應該和 CPU 核數相同。這個在實際的工做中,應在是看給本身分到的隊列的資源一共是多少,而本身預估這個任務大概須要多少資源,而後合理的設置slots數,也就是合理的設置並行度。
注意:
這麼舉例子吧,咱們提交一個job,就有了一個JobManger,那麼經過資源的分配,假如如今在3個節點上執行任務,那就等於說有3個TaskManager,假如每個TaskManager都包含了必定數量的插槽(slots)。插槽的數量限制了TaskManager可以執行的任務數量。這裏假設每一個TaskManager能夠接收3個task,一共9個TaskSlot,若是咱們設置parallelism.default=1,即運行程序默認的並行度爲1,9個TaskSlot只用了1個,有8個空閒,所以,設置合適的並行度才能提升效率。
算子的並行度: 一個特定算子的子任務(subtask)的個數被稱之爲其並行度(parallelism)。一個程序中,不一樣的算子可能具備不一樣的並行度。
那麼具體的每一個算子的並行度是多少這個咱們後面具體說算子的時候再來說,這裏先大體介紹一下:
相同並行度的one to one操做,Flink這樣相連的算子連接在一塊兒造成一個task,原來的算子成爲裏面的一部分。將算子連接成task是很是有效的優化:它能減小線程之間的切換和基於緩存區的數據交換,在減小時延的同時提高吞吐量。
任務鏈必須知足兩個條件:one-to-one的數據傳輸而且並行度相同
job的並行度:任務被分爲多個並行任務來執行,其中每一個並行的實例處理一部分數據。這些並行實例的數量被稱爲並行度。
咱們在實際生產環境中能夠從四個不一樣層面設置並行度:(具體代碼體如今後續寫)
操做算子層面(Operator Level)
執行環境層面(Execution Environment Level)
客戶端層面(Client Level)
系統層面(System Level)
須要注意的優先級:算子層面>環境層面>客戶端層面>系統層面。
咱們在上述wordCount的代碼中,就發現數據流由3部分組成,數據源source,數據轉換,數據最後的流出sink 3部分組成。那麼這其實也是咱們代碼的主要構成,經過合適的轉換算子將數據源的數據進行處理,最後把結果經過sink的方式輸出到別的地方。
每個dataflow以一個或多個sources開始以一個或多個sinks結束。
Flink 將算子的子任務連接成 task。每一個 task 由一個線程執行。把算子連接成 tasks 可以減小線程間切換和緩衝的開銷,在下降延遲的同時提升了總體吞吐量。
那麼上述的數據流直接映射成的數據流圖是StreamGraph,也被稱爲邏輯流圖,由於它們表示的是計算邏輯的高級視圖。爲了執行一個流處理程序,Flink須要將邏輯流圖轉換爲物理數據流圖(也叫執行圖),詳細說明程序的執行方式。
Flink 中的執行圖能夠分紅四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖。
這幾個類在源碼中也能找到。
關於Flink比較基礎的框架概念已經瞭解的差很少了,部份內容也來源於官網中文翻譯。
可能有些概念還沒理解透徹,不過不要緊,在接下來的應用中,使用的多了就會有不同的收穫,指望經過一篇文章或者只看官網的介紹理解透徹是不存在的,畢竟這框架是衆多大牛彙集在一塊兒搞了不少年才搞出來的,咱們只不過是個框架的使用者,約等於搬磚工。
因此我寫的內容也歡迎你們前來討論。
那麼下一篇Flink的文章我來繼續學習關於Flink的API。期待能和你一塊兒學習!